news 2026/3/27 12:45:10

用 Python 编写 K8s 漏洞扫描器,自动发现集群内的配置错误

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
用 Python 编写 K8s 漏洞扫描器,自动发现集群内的配置错误

标签:#Kubernetes #Python #DevSecOps #网络安全 #云原生 #自动化运维


🚨 前言:你的集群也许正在“裸奔”

你是否见过这种 YAML 配置?

securityContext:privileged:true# 为了省事,直接给特权runAsUser:0# 直接用 root 跑

这种配置在开发环境很常见,可一旦溜进生产环境,就是严重的容器逃逸风险。
普通的镜像扫描(Image Scanning)只能扫出软件漏洞(CVE),扫不出这种配置漏洞。我们需要的是配置审计(Configuration Audit)


🏗️ 一、 核心架构:基于 K8s API 的巡检

我们要做的不是黑客攻击,而是以“管理员”的身份,通过 Python 调用 K8s 的 API Server,拉取所有资源的配置清单,然后用一套规则去匹配。

扫描器工作流 (Mermaid):

1. 加载 ~/.kube/config
2. List API (Pods/Deployments)
3. 返回 JSON 对象
6. 告警
4. 逐条匹配
5. 命中风险

安全规则库

检测特权容器 (Privileged)

检测 Root 用户运行

检测资源限制缺失 (No Limits)

检测敏感挂载 (Docker Socket)

启动扫描器

K8s 认证

K8s API Server

Python 分析引擎

生成 JSON/HTML 报告

钉钉/Slack 通知


🛠️ 二、 环境准备

我们需要安装 Python 的 K8s 官方客户端:

pipinstallkubernetes

确保你的机器上配置了~/.kube/config,且有权限读取集群信息。


💻 三、 代码实战:编写扫描核心

我们将实现一个简单的K8sScanner类,演示如何查找“特权容器”“未设置资源限制”的 Pod。

1. 初始化连接
fromkubernetesimportclient,configclassK8sScanner:def__init__(self):# 1. 加载 kubeconfig,如果在 Pod 内部运行则用 load_incluster_config()try:config.load_kube_config()print("✅ 成功连接到 K8s 集群")exceptExceptionase:print(f"❌ 连接失败:{e}")exit(1)self.v1=client.CoreV1Api()
2. 核心检测逻辑
defscan_privileged_pods(self,namespace='default'):""" 规则:检测开启了 privileged=true 的危险容器 """print(f"🔍 开始扫描{namespace}命名空间的特权容器...")try:pods=self.v1.list_namespaced_pod(namespace)risks=[]forpodinpods.items:pod_name=pod.metadata.nameforcontainerinpod.spec.containers:# 获取 security_contextsec_ctx=container.security_contextifsec_ctxandsec_ctx.privileged:risk_msg=f"🔴 [高危] Pod:{pod_name}| 容器:{container.name}| 问题: 开启了特权模式!"risks.append(risk_msg)print(risk_msg)ifnotrisks:print("✅ 未发现特权容器。")returnrisksexceptExceptionase:print(f"扫描出错:{e}")return[]defscan_resource_limits(self,namespace='default'):""" 规则:检测未设置 CPU/Memory Limit 的容器 (容易导致节点雪崩) """print(f"🔍 开始扫描{namespace}命名空间的资源限制...")pods=self.v1.list_namespaced_pod(namespace)forpodinpods.items:forcontainerinpod.spec.containers:resources=container.resources# 检查 limits 是否存在ifresources.limitsisNone:print(f"⚠️ [中危] Pod:{pod.metadata.name}| 容器:{container.name}| 问题: 未设置资源限制 (OOM风险)")
3. 运行扫描
if__name__=="__main__":scanner=K8sScanner()# 扫描 default 命名空间scanner.scan_privileged_pods("default")scanner.scan_resource_limits("default")# 你可以遍历所有 namespace# namespaces = [ns.metadata.name for ns in scanner.v1.list_namespace().items]

🛡️ 四、 进阶:DevSecOps 的闭环

写好脚本只是第一步,如何把它融入到运维体系中?

  1. CI/CD 集成
    把这个脚本放在 Jenkins 或 GitLab CI 里。
    当开发者提交 YAML 文件时,先跑一遍 Python 脚本(解析本地 YAML 文件而非连接集群),如果有高危配置,直接打断构建,禁止发布
  2. 准入控制 (Admission Controller)
    如果你想做的更绝一点,可以了解OPA (Open Policy Agent)Kyverno。它们是 K8s 原生的拦截器,可以在 API 请求阶段就拒绝不合规的 Pod 创建。
    我们的 Python 脚本适合做“事后审计”,OPA 适合做“事前拦截”。
  3. 自定义规则扩展
  • 镜像来源检查:必须来自公司私有 Harbor,禁止使用docker.io公共镜像。
  • 探针检查:检查 Deployment 是否配置了livenessProbereadinessProbe
  • 标签规范:检查是否包含app,owner,env等必填 Label。

🎯 总结

通过 Pythonkubernetes库,我们将黑盒的集群配置变成了可编程、可审计的数据对象
这不仅是一个安全工具,更是一个运维治理的利器。

别等到黑客通过一个特权 Pod 拿下了你的 Master 节点,才后悔没有早点做配置审计。

Next Step:
打开你的 IDE,把上面的代码复制进去,运行一下。看看你的开发环境集群里,到底藏了多少个没设资源限制的“裸奔”容器?结果可能会让你大吃一惊!

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/19 21:43:11

AI 技术在CRM 系统中的应用

AI 技术已经从 CRM 系统(客户关系管理)的“插件”进化为了其核心引擎。现代 CRM 不再只是一个存储客户资料的静态数据库,而是一个能够主动思考、预测并执行任务的“智能助手”。以下是 AI 技术在CRM 系统中的核心应用场景:1. 销售…

作者头像 李华
网站建设 2026/3/27 3:37:46

AVENTICS 5610141310控制器

AVENTICS 5610141310 是一款气动元件,通常用于工业自动化系统中的气动控制。该产品属于 AVENTICS(现为 Emerson 自动化解决方案的一部分)的系列产品,专为高效、可靠的气动应用设计。主要特点类型:气动阀或气动控制元件…

作者头像 李华
网站建设 2026/3/19 9:37:24

Dify二开系列:从LightRAG到多模态RAG,全能化进化之路

一、系列回顾:从"能用"到"精准" 前两篇分别讲了Dify的企业级改造和LightRAG知识图谱RAG。 第一篇:Dify二开实战。双层配额管理,防止Token费用失控;账号同步和软删除,解决运维痛点。 第二篇:用LightRAG的知识图谱RAG,准确率从75%提升到90%。 这两个改造…

作者头像 李华
网站建设 2026/3/26 18:18:12

真实业务场景死锁案例:电商订单处理

1. 业务场景介绍 场景:电商系统的订单确认流程,需要处理三个核心资源: 订单锁:防止同一订单被重复处理库存锁:防止库存超卖支付锁:防止重复支付 2. 死锁发生的真实代码 2.1 订单处理服务 Service publ…

作者头像 李华