import pandas as pd import numpy as np from sklearn.preprocessing import StandardScaler, LabelEncoder class DataPipeline: def __init__(self): self.transformers = [] def add_transformer(self, transformer): self.transformers.append(transformer) def fit(self, data): for transformer in self.transformers: if hasattr(transformer, 'fit'): data = transformer.fit_transform(data) else: data = transformer(data) return data def transform(self, data): for transformer in self.transformers: if hasattr(transformer, 'transform'): data = transformer.transform(data) else: data = transformer(data) return data class DataCleaner: def __init__(self): pass def __call__(self, df): df = df.dropna() df = df.drop_duplicates() for col in df.columns: if df[col].dtype == 'object': df[col] = df[col].str.strip() return df class FeatureEngineer: def __init__(self): self.scaler = StandardScaler() self.encoder = LabelEncoder() def fit_transform(self, df): numeric_cols = df.select_dtypes(include=[np.number]).columns categorical_cols = df.select_dtypes(include=['object']).columns if len(numeric_cols) > 0: df[numeric_cols] = self.scaler.fit_transform(df[numeric_cols]) if len(categorical_cols) > 0: for col in categorical_cols: df[col] = self.encoder.fit_transform(df[col]) return df def transform(self, df): numeric_cols = df.select_dtypes(include=[np.number]).columns categorical_cols = df.select_dtypes(include=['object']).columns if len(numeric_cols) > 0: df[numeric_cols] = self.scaler.transform(df[numeric_cols]) if len(categorical_cols) > 0: for col in categorical_cols: df[col] = self.encoder.transform(df[col]) return df
2.2 模型训练流水线
import torch import torch.nn as nn from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score class ModelTrainingPipeline: def __init__(self, model, optimizer, loss_fn): self.model = model self.optimizer = optimizer self.loss_fn = loss_fn def train(self, X, y, epochs=100, batch_size=32): X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2) X_train = torch.tensor(X_train.values, dtype=torch.float32) y_train = torch.tensor(y_train.values, dtype=torch.long) X_val = torch.tensor(X_val.values, dtype=torch.float32) y_val = torch.tensor(y_val.values, dtype=torch.long) for epoch in range(epochs): self.model.train() permutation = torch.randperm(X_train.size()[0]) for i in range(0, X_train.size()[0], batch_size): indices = permutation[i:i+batch_size] batch_x, batch_y = X_train[indices], y_train[indices] self.optimizer.zero_grad() outputs = self.model(batch_x) loss = self.loss_fn(outputs, batch_y) loss.backward() self.optimizer.step() self.model.eval() with torch.no_grad(): val_outputs = self.model(X_val) val_loss = self.loss_fn(val_outputs, y_val) predictions = torch.argmax(val_outputs, dim=1) accuracy = accuracy_score(y_val.numpy(), predictions.numpy()) print(f"Epoch {epoch+1}/{epochs}, Loss: {val_loss.item():.4f}, Accuracy: {accuracy:.4f}") def evaluate(self, X, y): self.model.eval() X = torch.tensor(X.values, dtype=torch.float32) y = torch.tensor(y.values, dtype=torch.long) with torch.no_grad(): outputs = self.model(X) predictions = torch.argmax(outputs, dim=1) accuracy = accuracy_score(y.numpy(), predictions.numpy()) return accuracy class ExperimentTracker: def __init__(self, experiment_name): self.experiment_name = experiment_name self.metrics = [] def log_metric(self, name, value): self.metrics.append({ 'experiment': self.experiment_name, 'metric': name, 'value': value, 'timestamp': pd.Timestamp.now() }) def log_params(self, params): self.params = params def report(self): print(f"Experiment: {self.experiment_name}") print(f"Parameters: {self.params}") print("Metrics:") for metric in self.metrics: print(f" {metric['metric']}: {metric['value']} at {metric['timestamp']}")