news 2026/5/7 9:17:48

08-MLOps与工程落地——工作流编排:Kubeflow

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
08-MLOps与工程落地——工作流编排:Kubeflow

工作流编排:Kubeflow(Kubernetes原生ML流水线、组件化、分布式训练)

一、Kubeflow概述

1.1 什么是Kubeflow?

importmatplotlib.pyplotaspltfrommatplotlib.patchesimportRectangle,FancyBboxPatchimportwarnings warnings.filterwarnings('ignore')print("="*60)print("Kubeflow:Kubernetes原生ML平台")print("="*60)# Kubeflow组件图fig,ax=plt.subplots(figsize=(12,10))ax.axis('off')# 核心组件components={'Kubeflow\nPipelines':(0.2,0.8),'Katib\n(HPO)':(0.5,0.8),'KFServing':(0.8,0.8),'Notebooks':(0.2,0.55),'Training\n(TFJob/PyTorchJob)':(0.5,0.55),'Multi-Tenancy':(0.8,0.55),'Istio':(0.2,0.3),'Argo':(0.5,0.3),'Kubernetes':(0.8,0.3),}forname,(x,y)incomponents.items():circle=plt.Circle((x,y),0.1,color='lightblue',ec='black')ax.add_patch(circle)ax.text(x,y,name,ha='center',va='center',fontsize=7)ax.set_xlim(0,1)ax.set_ylim(0,1)ax.set_title('Kubeflow组件架构',fontsize=14)plt.tight_layout()plt.show()print("\n💡 Kubeflow核心优势:")print(" - Kubernetes原生,云原生架构")print(" - 端到端ML工作流")print(" - 支持分布式训练")print(" - 可扩展的组件化设计")print(" - 多框架支持(TensorFlow, PyTorch, MXNet)")

二、Kubeflow安装

2.1 安装配置

defkubeflow_installation():"""Kubeflow安装"""print("\n"+"="*60)print("Kubeflow安装")print("="*60)code=""" # 1. 使用kubectl安装 # 安装kubeflow export KF_NAME="kubeflow" export BASE_DIR="/opt/kubeflow" export KF_DIR=${BASE_DIR}/${KF_NAME} # 下载kfctl wget https://github.com/kubeflow/kfctl/releases/download/v1.7.0/kfctl_v1.7.0-0-g0e3e3a4_linux.tar.gz tar -xvf kfctl_v1.7.0-0-g0e3e3a4_linux.tar.gz # 部署 ${KF_DIR}/kfctl apply -V -f ${CONFIG_URI} # 2. 使用Minikube本地测试 # minikube start --cpus 4 --memory 8192 --disk-size 50g # kubectl create ns kubeflow # kfctl apply -V -f ${CONFIG_URI} # 3. 使用Kind # kind create cluster --name kubeflow --config kind-config.yaml # kubectl create ns kubeflow # kfctl apply -V -f ${CONFIG_URI} # 4. 访问Kubeflow UI # kubectl port-forward svc/istio-ingressgateway -n istio-system 8080:80 # 浏览器打开 http://localhost:8080 # 5. 查看Pod状态 # kubectl get pods -n kubeflow # 6. 卸载 # ${KF_DIR}/kfctl delete -V -f ${CONFIG_URI} """print(code)kubeflow_installation()

三、Kubeflow Pipelines

3.1 组件定义

defkubeflow_pipelines():"""Kubeflow Pipelines"""print("\n"+"="*60)print("Kubeflow Pipelines组件")print("="*60)code=""" import kfp from kfp import dsl from kfp.dsl import component, Input, Output, Dataset, Model from typing import NamedTuple # 1. 使用@component装饰器定义组件 @component( packages_to_install=['pandas', 'numpy', 'scikit-learn'], base_image='python:3.9' ) def preprocess_op( input_data: Input[Dataset], output_train: Output[Dataset], output_test: Output[Dataset], test_size: float = 0.2 ): import pandas as pd from sklearn.model_selection import train_test_split data = pd.read_csv(input_data.path) X = data.drop('target', axis=1) y = data['target'] X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=test_size, random_state=42 ) train_data = pd.concat([X_train, y_train], axis=1) test_data = pd.concat([X_test, y_test], axis=1) train_data.to_csv(output_train.path, index=False) test_data.to_csv(output_test.path, index=False) # 2. 返回多个值的组件 @component( packages_to_install=['scikit-learn'], base_image='python:3.9' ) def train_op( train_data: Input[Dataset], model: Output[Model], n_estimators: int = 100, max_depth: int = 10 ) -> NamedTuple('Outputs', [('accuracy', float), ('model_path', str)]): import pandas as pd from sklearn.ensemble import RandomForestClassifier import joblib from collections import namedtuple data = pd.read_csv(train_data.path) X = data.drop('target', axis=1) y = data['target'] clf = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth) clf.fit(X, y) model_path = model.path + "/model.joblib" joblib.dump(clf, model_path) accuracy = clf.score(X, y) Outputs = namedtuple('Outputs', ['accuracy', 'model_path']) return Outputs(accuracy, model_path) # 3. 使用ContainerOp(底层API) def train_op_container(train_data_path, n_estimators=100): return dsl.ContainerOp( name='train', image='python:3.9', command=['python', '-c'], arguments=[ f''' import pandas as pd from sklearn.ensemble import RandomForestClassifier import joblib data = pd.read_csv("{train_data_path}") X = data.drop('target', axis=1) y = data['target'] model = RandomForestClassifier(n_estimators={n_estimators}) model.fit(X, y) joblib.dump(model, "/model/model.pkl") print("Model saved") ''' ], file_outputs={'model': '/model'} ) """print(code)kubeflow_pipelines()

3.2 Pipeline定义

defpipeline_definition():"""Pipeline定义"""print("\n"+"="*60)print("Pipeline定义")print("="*60)code=""" # 1. 定义Pipeline @dsl.pipeline( name='ML Training Pipeline', description='End-to-end machine learning pipeline', pipeline_root='gs://my-bucket/pipeline-root' ) def ml_pipeline( data_path: str = 'gs://bucket/data.csv', test_size: float = 0.2, n_estimators: int = 100, max_depth: int = 10 ): # 数据加载 load_task = load_data_op(data_path) # 数据预处理 preprocess_task = preprocess_op( input_data=load_task.outputs['data'], test_size=test_size ) # 模型训练 train_task = train_op( train_data=preprocess_task.outputs['output_train'], n_estimators=n_estimators, max_depth=max_depth ) # 模型评估 evaluate_task = evaluate_op( test_data=preprocess_task.outputs['output_test'], model=train_task.outputs['model'] ) # 条件部署 with dsl.Condition(evaluate_task.outputs['accuracy'] > 0.85): deploy_task = deploy_op(model=train_task.outputs['model']) # 2. 带循环的Pipeline @dsl.pipeline(name='Hyperparameter Tuning Pipeline') def hp_tuning_pipeline( data_path: str = 'gs://bucket/data.csv', n_estimators_list: list = [50, 100, 150, 200] ): load_task = load_data_op(data_path) preprocess_task = preprocess_op(load_task.outputs['data']) # 并行训练多个模型 train_tasks = [] for n_estimators in n_estimators_list: train_task = train_op( train_data=preprocess_task.outputs['output_train'], n_estimators=n_estimators ) train_tasks.append(train_task) # 选择最佳模型 best_model_task = select_best_model_op(train_tasks) # 3. 带资源的Pipeline @dsl.pipeline(name='Resource-aware Pipeline') def resource_pipeline(data_path: str = 'gs://bucket/data.csv'): load_task = load_data_op(data_path).set_cpu_request('1').set_memory_request('2Gi') preprocess_task = preprocess_op(load_task.outputs['data']).set_gpu_limit(0) train_task = train_op( preprocess_task.outputs['output_train'] ).set_cpu_request('4').set_memory_request('8Gi').set_gpu_limit(1) # 设置重试策略 train_task = train_task.set_retry(3) # 4. 编译Pipeline # kfp.compiler.Compiler().compile(ml_pipeline, 'pipeline.yaml') # 5. 运行Pipeline import kfp client = kfp.Client() run = client.create_run_from_pipeline_func( ml_pipeline, arguments={ 'data_path': 'gs://bucket/data.csv', 'test_size': 0.2, 'n_estimators': 100 }, experiment_name='ml_experiment' ) """print(code)pipeline_definition()

四、分布式训练

4.1 TensorFlow分布式训练

defdistributed_training():"""分布式训练"""print("\n"+"="*60)print("分布式训练")print("="*60)code=""" # 1. TFJob定义 apiVersion: kubeflow.org/v1 kind: TFJob metadata: name: distributed-tfjob spec: tfReplicaSpecs: Chief: replicas: 1 template: spec: containers: - name: tensorflow image: tensorflow/tensorflow:2.13.0-gpu command: - python - /app/distributed_train.py resources: limits: nvidia.com/gpu: 1 Worker: replicas: 2 template: spec: containers: - name: tensorflow image: tensorflow/tensorflow:2.13.0-gpu command: - python - /app/distributed_train.py resources: limits: nvidia.com/gpu: 1 ParameterServer: replicas: 1 template: spec: containers: - name: tensorflow image: tensorflow/tensorflow:2.13.0 command: - python - /app/parameter_server.py # 2. PyTorchJob定义 apiVersion: kubeflow.org/v1 kind: PyTorchJob metadata: name: distributed-pytorchjob spec: pytorchReplicaSpecs: Master: replicas: 1 template: spec: containers: - name: pytorch image: pytorch/pytorch:2.0.0-cuda11.7 command: - python - -m - torch.distributed.run - --nnodes=3 - --nproc_per_node=1 - --rdzv_endpoint=$(MASTER_ADDR):29500 - distributed_train.py resources: limits: nvidia.com/gpu: 1 Worker: replicas: 2 template: spec: containers: - name: pytorch image: pytorch/pytorch:2.0.0-cuda11.7 command: - python - -m - torch.distributed.run - --nnodes=3 - --nproc_per_node=1 - --rdzv_endpoint=$(MASTER_ADDR):29500 - distributed_train.py resources: limits: nvidia.com/gpu: 1 # 3. MPIJob定义 apiVersion: kubeflow.org/v1 kind: MPIJob metadata: name: distributed-mpijob spec: slotsPerWorker: 1 runPolicy: cleanPodPolicy: Running mpiReplicaSpecs: Launcher: replicas: 1 template: spec: containers: - name: mpi-launcher image: mpioperator/tensorflow-benchmarks:latest command: - mpirun - --allow-run-as-root - -np 4 - --hostfile /etc/mpi/hostfile - python - distributed_train.py Worker: replicas: 2 template: spec: containers: - name: mpi-worker image: mpioperator/tensorflow-benchmarks:latest command: - /usr/sbin/sshd - -De """print(code)distributed_training()

五、Katib超参数调优

5.1 超参数搜索

defkatib_hpo():"""Katib超参数调优"""print("\n"+"="*60)print("Katib超参数调优")print("="*60)code=""" # 1. Katib Experiment定义 apiVersion: kubeflow.org/v1beta1 kind: Experiment metadata: name: random-forest-tuning spec: objective: type: maximize goal: 0.95 objectiveMetricName: accuracy algorithm: algorithmName: bayesianoptimization parallelTrialCount: 3 maxTrialCount: 12 maxFailedTrialCount: 3 parameters: - name: n_estimators parameterType: int feasibleSpace: min: "50" max: "300" - name: max_depth parameterType: int feasibleSpace: min: "5" max: "20" - name: min_samples_split parameterType: int feasibleSpace: min: "2" max: "10" - name: max_features parameterType: categorical feasibleSpace: list: - "sqrt" - "log2" trialTemplate: primaryContainerName: training-container trialParameters: - name: n_estimators description: Number of trees reference: n_estimators - name: max_depth description: Max depth reference: max_depth - name: min_samples_split description: Min samples split reference: min_samples_split - name: max_features description: Max features reference: max_features trialSpec: apiVersion: batch/v1 kind: Job spec: template: spec: containers: - name: training-container image: training-image:latest command: - python - train.py - --n_estimators=${trialParameters.n_estimators} - --max_depth=${trialParameters.max_depth} - --min_samples_split=${trialParameters.min_samples_split} - --max_features=${trialParameters.max_features} # 2. 创建Experiment # kubectl apply -f experiment.yaml # 3. 查看Experiment状态 # kubectl get experiments # kubectl describe experiment random-forest-tuning # 4. 查看最佳试验 # kubectl get trials -l experiment=random-forest-tuning # 5. 使用Python SDK from kubeflow.katib import KatibClient client = KatibClient() client.create_experiment('experiment.yaml') experiments = client.list_experiments(namespace='kubeflow') best_trial = client.get_optimal_hyperparameters('random-forest-tuning') print(f"Best parameters: {best_trial}") """print(code)katib_hpo()

六、KFServing模型部署

6.1 模型服务

defkfserving():"""KFServing模型部署"""print("\n"+"="*60)print("KFServing模型部署")print("="*60)code=""" # 1. InferenceService定义 apiVersion: serving.kserve.io/v1beta1 kind: InferenceService metadata: name: sklearn-model spec: predictor: sklearn: storageUri: gs://kfserving-examples/models/sklearn/iris resources: limits: cpu: 100m memory: 256Mi requests: cpu: 100m memory: 256Mi # 2. PyTorch模型部署 apiVersion: serving.kserve.io/v1beta1 kind: InferenceService metadata: name: pytorch-model spec: predictor: pytorch: storageUri: gs://kfserving-examples/models/pytorch/cifar10 resources: limits: nvidia.com/gpu: 1 # 3. TensorFlow模型部署 apiVersion: serving.kserve.io/v1beta1 kind: InferenceService metadata: name: tensorflow-model spec: predictor: tensorflow: storageUri: gs://kfserving-examples/models/tensorflow/mnist resources: limits: cpu: 2 memory: 4Gi # 4. 自定义模型 apiVersion: serving.kserve.io/v1beta1 kind: InferenceService metadata: name: custom-model spec: predictor: containers: - name: custom-container image: custom-model:latest command: - python - -m - model_server args: - --model_name=custom - --model_dir=/mnt/models resources: limits: cpu: 2 memory: 4Gi storageUri: gs://my-bucket/models # 5. 金丝雀发布 apiVersion: serving.kserve.io/v1beta1 kind: InferenceService metadata: name: canary-model spec: predictor: canary: trafficPercent: 10 model: sklearn: storageUri: gs://kfserving-examples/models/sklearn/iris-v2 # 6. 请求示例 # curl -X POST http://sklearn-model.default.example.com/v1/models/sklearn-model:predict \\ # -H "Content-Type: application/json" \\ # -d '{"instances":[[6.8,2.8,4.8,1.4]]}' """print(code)kfserving()

七、完整Pipeline示例

7.1 端到端Pipeline

defcomplete_pipeline():"""完整Pipeline示例"""print("\n"+"="*60)print("完整Kubeflow Pipeline")print("="*60)code=""" import kfp from kfp import dsl from kfp.dsl import component, Input, Output, Dataset, Model, Metrics # 定义所有组件 @component( packages_to_install=['pandas', 'numpy', 'scikit-learn'], base_image='python:3.9' ) def data_loader_op( data_url: str, output_data: Output[Dataset] ): import pandas as pd data = pd.read_csv(data_url) data.to_csv(output_data.path, index=False) @component( packages_to_install=['pandas', 'scikit-learn'], base_image='python:3.9' ) def preprocessor_op( input_data: Input[Dataset], output_train: Output[Dataset], output_test: Output[Dataset], test_size: float = 0.2 ): import pandas as pd from sklearn.model_selection import train_test_split data = pd.read_csv(input_data.path) X = data.drop('target', axis=1) y = data['target'] X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=test_size, random_state=42 ) train_data = pd.concat([X_train, y_train], axis=1) test_data = pd.concat([X_test, y_test], axis=1) train_data.to_csv(output_train.path, index=False) test_data.to_csv(output_test.path, index=False) @component( packages_to_install=['scikit-learn', 'joblib'], base_image='python:3.9' ) def trainer_op( train_data: Input[Dataset], model: Output[Model], n_estimators: int = 100, max_depth: int = 10 ): import pandas as pd from sklearn.ensemble import RandomForestClassifier import joblib data = pd.read_csv(train_data.path) X = data.drop('target', axis=1) y = data['target'] clf = RandomForestClassifier( n_estimators=n_estimators, max_depth=max_depth, random_state=42 ) clf.fit(X, y) model_path = model.path + "/model.joblib" joblib.dump(clf, model_path) @component( packages_to_install=['scikit-learn', 'joblib', 'pandas'], base_image='python:3.9' ) def evaluator_op( test_data: Input[Dataset], model: Input[Model], metrics: Output[Metrics] ): import pandas as pd from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score import joblib data = pd.read_csv(test_data.path) X = data.drop('target', axis=1) y = data['target'] model_path = model.path + "/model.joblib" clf = joblib.load(model_path) y_pred = clf.predict(X) accuracy = accuracy_score(y, y_pred) precision = precision_score(y, y_pred, average='weighted') recall = recall_score(y, y_pred, average='weighted') f1 = f1_score(y, y_pred, average='weighted') metrics.log_metric('accuracy', accuracy) metrics.log_metric('precision', precision) metrics.log_metric('recall', recall) metrics.log_metric('f1', f1) @component( packages_to_install=['google-cloud-storage'], base_image='python:3.9' ) def deployer_op( model: Input[Model], model_name: str, bucket: str ): from google.cloud import storage import os client = storage.Client() bucket_obj = client.bucket(bucket) model_path = model.path + "/model.joblib" blob = bucket_obj.blob(f"models/{model_name}/model.joblib") blob.upload_from_filename(model_path) print(f"Model deployed to gs://{bucket}/models/{model_name}/model.joblib") @dsl.pipeline( name='Complete ML Pipeline', description='End-to-end machine learning pipeline on Kubeflow', pipeline_root='gs://my-bucket/pipeline-root' ) def complete_ml_pipeline( data_url: str = 'gs://bucket/data.csv', test_size: float = 0.2, n_estimators: int = 100, max_depth: int = 10, model_name: str = 'random_forest_v1', deploy_bucket: str = 'my-model-bucket' ): # 数据加载 load_task = data_loader_op(data_url=data_url) # 数据预处理 preprocess_task = preprocessor_op( input_data=load_task.outputs['output_data'], test_size=test_size ) # 模型训练 train_task = trainer_op( train_data=preprocess_task.outputs['output_train'], n_estimators=n_estimators, max_depth=max_depth ) # 模型评估 evaluate_task = evaluator_op( test_data=preprocess_task.outputs['output_test'], model=train_task.outputs['model'] ) # 条件部署 with dsl.Condition(evaluate_task.outputs['metrics']['accuracy'] > 0.85): deploy_task = deployer_op( model=train_task.outputs['model'], model_name=model_name, bucket=deploy_bucket ) # 编译并运行 if __name__ == '__main__': kfp.compiler.Compiler().compile(complete_ml_pipeline, 'pipeline.yaml') client = kfp.Client() run = client.create_run_from_pipeline_func( complete_ml_pipeline, arguments={ 'data_url': 'gs://bucket/data.csv', 'test_size': 0.2, 'n_estimators': 100, 'max_depth': 10 }, experiment_name='production_experiment' ) """print(code)complete_pipeline()

八、总结

组件功能适用场景
Pipelines工作流编排ML流水线
Katib超参数调优模型优化
KFServing模型部署生产推理
TFJob/PyTorchJob分布式训练大规模训练
Notebooks开发环境交互式开发

Kubeflow vs Airflow对比:

  • Kubeflow: Kubernetes原生,适合大规模ML工作负载
  • Airflow: 通用工作流,适合数据ETL和调度
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/7 9:17:28

AI工具搭建自动化视频生成可商用的模型许可

这个话题吧,其实挺多人来问过。不是问技术行不行,而是问做完的东西能不能真的拿出去用,会不会惹上麻烦。搞视频生成工具的时候,很多人一头扎进代码里,调模型、跑流程,等到真要上线或者交付了,才…

作者头像 李华
网站建设 2026/5/7 9:15:31

共用体练习

给定n和m,接下来有n个描述,每个描述包含一个类型标志和一组相应的数据。类型标志共3种:INT DOUBLE STRING,然后对应一组相应的数据。紧接着有m个询问,每个询问仅包含一个整数x,要求输出第x个描述对应的数据…

作者头像 李华
网站建设 2026/5/7 9:13:30

CCM工具:一键切换多AI模型,提升Claude Code开发效率

1. 项目概述:一个为Claude Code设计的模型提供商管理器如果你和我一样,日常重度依赖Claude Code进行编程,但偶尔会遇到某个服务商API不稳定、速率限制或者单纯想对比不同模型的代码生成效果,那么手动切换环境变量、修改配置文件的…

作者头像 李华
网站建设 2026/5/7 9:13:29

扣子(Coze+image)实战:电商人福音!Coze 一键生成详情页,秒完成

大家好,我是专注于AI的咕咕姐。你还在为电商详情页而苦恼吗?没有美工,不会PS,该如何做电商详情页?今天通过image2Coze工作流一键可以生成电商详情页,直接省去美工成本,感兴趣的立刻跟练操作。干…

作者头像 李华
网站建设 2026/5/7 9:09:34

Legacy iOS Kit终极指南:让旧iPhone和iPad重获新生

Legacy iOS Kit终极指南:让旧iPhone和iPad重获新生 【免费下载链接】Legacy-iOS-Kit An all-in-one tool to restore/downgrade, save SHSH blobs, jailbreak legacy iOS devices, and more 项目地址: https://gitcode.com/gh_mirrors/le/Legacy-iOS-Kit 你是…

作者头像 李华
网站建设 2026/5/7 9:06:28

TranslucentTB终极指南:Windows任务栏透明化完全解决方案

TranslucentTB终极指南:Windows任务栏透明化完全解决方案 【免费下载链接】TranslucentTB A lightweight utility that makes the Windows taskbar translucent/transparent. 项目地址: https://gitcode.com/gh_mirrors/tr/TranslucentTB TranslucentTB是一款…

作者头像 李华