news 2026/5/14 13:50:02

机器学习流水线:从数据到部署

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
机器学习流水线:从数据到部署

机器学习流水线:从数据到部署

1. 技术分析

1.1 机器学习流水线架构

完整的机器学习流水线包含多个阶段:

ML Pipeline 数据采集 → 数据清洗 → 特征工程 → 模型训练 → 模型评估 → 部署 → 监控

1.2 流水线组件对比

组件功能常用工具
数据采集获取数据Kafka、Flume
数据清洗预处理Pandas、PySpark
特征工程特征提取Scikit-learn、Feast
模型训练训练模型PyTorch、TensorFlow
模型评估评估指标Scikit-learn、MLflow
模型部署上线服务Flask、TorchServe
模型监控性能监控Prometheus、Evidently

1.3 流水线工具对比

工具定位特点
Kubeflow全流程云原生
Airflow编排任务调度
MLflow实验管理追踪记录
Prefect工作流Python原生

2. 核心功能实现

2.1 数据预处理流水线

import pandas as pd import numpy as np from sklearn.preprocessing import StandardScaler, LabelEncoder class DataPipeline: def __init__(self): self.transformers = [] def add_transformer(self, transformer): self.transformers.append(transformer) def fit(self, data): for transformer in self.transformers: if hasattr(transformer, 'fit'): data = transformer.fit_transform(data) else: data = transformer(data) return data def transform(self, data): for transformer in self.transformers: if hasattr(transformer, 'transform'): data = transformer.transform(data) else: data = transformer(data) return data class DataCleaner: def __init__(self): pass def __call__(self, df): df = df.dropna() df = df.drop_duplicates() for col in df.columns: if df[col].dtype == 'object': df[col] = df[col].str.strip() return df class FeatureEngineer: def __init__(self): self.scaler = StandardScaler() self.encoder = LabelEncoder() def fit_transform(self, df): numeric_cols = df.select_dtypes(include=[np.number]).columns categorical_cols = df.select_dtypes(include=['object']).columns if len(numeric_cols) > 0: df[numeric_cols] = self.scaler.fit_transform(df[numeric_cols]) if len(categorical_cols) > 0: for col in categorical_cols: df[col] = self.encoder.fit_transform(df[col]) return df def transform(self, df): numeric_cols = df.select_dtypes(include=[np.number]).columns categorical_cols = df.select_dtypes(include=['object']).columns if len(numeric_cols) > 0: df[numeric_cols] = self.scaler.transform(df[numeric_cols]) if len(categorical_cols) > 0: for col in categorical_cols: df[col] = self.encoder.transform(df[col]) return df

2.2 模型训练流水线

import torch import torch.nn as nn from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score class ModelTrainingPipeline: def __init__(self, model, optimizer, loss_fn): self.model = model self.optimizer = optimizer self.loss_fn = loss_fn def train(self, X, y, epochs=100, batch_size=32): X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2) X_train = torch.tensor(X_train.values, dtype=torch.float32) y_train = torch.tensor(y_train.values, dtype=torch.long) X_val = torch.tensor(X_val.values, dtype=torch.float32) y_val = torch.tensor(y_val.values, dtype=torch.long) for epoch in range(epochs): self.model.train() permutation = torch.randperm(X_train.size()[0]) for i in range(0, X_train.size()[0], batch_size): indices = permutation[i:i+batch_size] batch_x, batch_y = X_train[indices], y_train[indices] self.optimizer.zero_grad() outputs = self.model(batch_x) loss = self.loss_fn(outputs, batch_y) loss.backward() self.optimizer.step() self.model.eval() with torch.no_grad(): val_outputs = self.model(X_val) val_loss = self.loss_fn(val_outputs, y_val) predictions = torch.argmax(val_outputs, dim=1) accuracy = accuracy_score(y_val.numpy(), predictions.numpy()) print(f"Epoch {epoch+1}/{epochs}, Loss: {val_loss.item():.4f}, Accuracy: {accuracy:.4f}") def evaluate(self, X, y): self.model.eval() X = torch.tensor(X.values, dtype=torch.float32) y = torch.tensor(y.values, dtype=torch.long) with torch.no_grad(): outputs = self.model(X) predictions = torch.argmax(outputs, dim=1) accuracy = accuracy_score(y.numpy(), predictions.numpy()) return accuracy class ExperimentTracker: def __init__(self, experiment_name): self.experiment_name = experiment_name self.metrics = [] def log_metric(self, name, value): self.metrics.append({ 'experiment': self.experiment_name, 'metric': name, 'value': value, 'timestamp': pd.Timestamp.now() }) def log_params(self, params): self.params = params def report(self): print(f"Experiment: {self.experiment_name}") print(f"Parameters: {self.params}") print("Metrics:") for metric in self.metrics: print(f" {metric['metric']}: {metric['value']} at {metric['timestamp']}")

2.3 模型部署流水线

import pickle import joblib from flask import Flask, request, jsonify class ModelDeployer: def __init__(self, model, preprocessor): self.model = model self.preprocessor = preprocessor def save(self, model_path, preprocessor_path): joblib.dump(self.model, model_path) joblib.dump(self.preprocessor, preprocessor_path) @classmethod def load(cls, model_path, preprocessor_path): model = joblib.load(model_path) preprocessor = joblib.load(preprocessor_path) return cls(model, preprocessor) def predict(self, data): data = self.preprocessor.transform(data) return self.model.predict(data) class FlaskAPI: def __init__(self, model_deployer): self.app = Flask(__name__) self.deployer = model_deployer @self.app.route('/predict', methods=['POST']) def predict(): data = request.get_json() df = pd.DataFrame(data) predictions = self.deployer.predict(df) return jsonify({'predictions': predictions.tolist()}) def run(self, host='0.0.0.0', port=5000): self.app.run(host=host, port=port) class ModelMonitor: def __init__(self): self.prediction_history = [] self.performance_history = [] def log_prediction(self, data, prediction, actual=None): self.prediction_history.append({ 'data': data, 'prediction': prediction, 'actual': actual, 'timestamp': pd.Timestamp.now() }) def calculate_drift(self): if len(self.prediction_history) < 2: return 0 recent_predictions = [p['prediction'] for p in self.prediction_history[-100:]] earlier_predictions = [p['prediction'] for p in self.prediction_history[-200:-100]] recent_dist = pd.Series(recent_predictions).value_counts(normalize=True) earlier_dist = pd.Series(earlier_predictions).value_counts(normalize=True) return sum(abs(recent_dist - earlier_dist)) / 2 def report(self): drift = self.calculate_drift() print(f"Data Drift: {drift:.4f}") print(f"Total Predictions: {len(self.prediction_history)}")

3. 性能对比

3.1 流水线工具对比

工具易用性扩展性监控能力部署复杂度
Airflow
Prefect
Kubeflow很高很高很高
MLflow

3.2 预处理框架对比

框架处理速度内存效率功能丰富度
Pandas
PySpark很快
Dask

3.3 部署方式对比

方式延迟(ms)吞吐量可扩展性
Flask50
TorchServe10
TensorRT5很高很高

4. 最佳实践

4.1 流水线设计

def build_pipeline(config): pipeline = DataPipeline() if config.get('cleaning', True): pipeline.add_transformer(DataCleaner()) if config.get('feature_engineering', True): pipeline.add_transformer(FeatureEngineer()) return pipeline class PipelineFactory: @staticmethod def create(config): if config['type'] == 'classification': return ClassificationPipeline(config) elif config['type'] == 'regression': return RegressionPipeline(config)

4.2 流水线执行

class MLWorkflow: def __init__(self, data_pipeline, training_pipeline, deployer): self.data_pipeline = data_pipeline self.training_pipeline = training_pipeline self.deployer = deployer def run(self, data, labels): print("Step 1: Data Preprocessing") data = self.data_pipeline.fit(data) print("Step 2: Model Training") self.training_pipeline.train(data, labels) print("Step 3: Model Evaluation") accuracy = self.training_pipeline.evaluate(data, labels) print(f"Final Accuracy: {accuracy:.4f}") print("Step 4: Model Deployment") self.deployer.save('model.joblib', 'preprocessor.joblib') return accuracy

5. 总结

机器学习流水线是生产环境的关键:

  1. 数据预处理:确保数据质量
  2. 模型训练:构建高质量模型
  3. 模型部署:上线服务
  4. 模型监控:持续跟踪性能

对比数据如下:

  • Prefect 是最易用的工作流工具
  • PySpark 适合大规模数据处理
  • TorchServe 是模型部署的好选择
  • 推荐使用 MLflow 进行实验管理
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/14 13:46:31

从PCB布线到芯片封装:为什么UCIe必须支持Lane Reversal?

从PCB布线到芯片封装&#xff1a;为什么UCIe必须支持Lane Reversal&#xff1f; 在现代多芯片系统设计中&#xff0c;物理布局与信号完整性往往成为工程师面临的首要挑战。想象这样一个场景&#xff1a;当两颗采用UCIe接口的芯片需要互连时&#xff0c;由于PCB走线优化需求或封…

作者头像 李华
网站建设 2026/5/14 13:44:36

android c++版opencv旋转图片效果

warpAffine(mat,mat, getRotationMatrix2D(Point2f(mat.cols/2,mat.rows/2),10,1),mat.size());可以看到&#xff1a;1 能自动处理超出部分&#xff0c;没有报错2 正数是左转。

作者头像 李华
网站建设 2026/5/14 13:42:20

Kubernetes部署Pi-hole:云原生家庭网络广告拦截方案实践

1. 项目概述&#xff1a;当Pi-hole遇上Kubernetes如果你和我一样&#xff0c;既是家庭网络的管理员&#xff0c;又是一名Kubernetes的爱好者&#xff0c;那你肯定想过一个问题&#xff1a;能不能把那个好用的网络广告拦截器Pi-hole&#xff0c;也塞进我的K8s集群里&#xff0c;…

作者头像 李华
网站建设 2026/5/14 13:42:18

10个亲测好用正版图片素材网站,省钱不踩坑

根据《2026中国数字创意素材行业发展白皮书》统计&#xff0c;2025年国内商业创意内容市场对正版图片素材的需求同比增长42%&#xff0c;越来越多创作者和企业开始重视授权合规的图片资源&#xff0c;避免版权纠纷。今天就给大家整理了10个亲测好用的正版图片素材平台&#xff…

作者头像 李华
网站建设 2026/5/14 13:40:05

构建去中心化RSS信息流:从Python爬虫到个性化推送实战

1. 项目概述与核心价值 最近在折腾信息获取效率时&#xff0c;我又把目光投向了RSS这个“古老”但依然高效的工具。说实话&#xff0c;在算法推荐大行其道的今天&#xff0c;主动订阅信息源&#xff0c;把信息流的控制权拿回自己手里&#xff0c;是一种难得的清醒。我这次关注…

作者头像 李华