Skip to content

管道与工作流

机器学习管道(Pipeline)是将数据预处理、特征工程和模型训练步骤组合成一个统一工作流的强大工具。它不仅能提高代码的可读性和可维护性,还能防止数据泄露,确保模型的可重现性。

基础管道概念

1. 简单管道

python
import numpy as np
import pandas as pd
from sklearn.datasets import load_iris, load_boston
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score, classification_report
import matplotlib.pyplot as plt

# 加载数据
iris = load_iris()
X, y = iris.data, iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 传统方法(容易出错)
def traditional_approach():
    """传统的分步处理方法"""
    print("=== 传统方法 ===")
    
    # 步骤1: 标准化
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)  # 注意:只能transform,不能fit
    
    # 步骤2: 训练模型
    model = LogisticRegression(random_state=42)
    model.fit(X_train_scaled, y_train)
    
    # 步骤3: 预测
    y_pred = model.predict(X_test_scaled)
    accuracy = accuracy_score(y_test, y_pred)
    
    print(f"准确率: {accuracy:.4f}")
    return accuracy

# 管道方法(推荐)
def pipeline_approach():
    """使用管道的方法"""
    print("=== 管道方法 ===")
    
    # 创建管道
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('classifier', LogisticRegression(random_state=42))
    ])
    
    # 一步完成训练和预测
    pipeline.fit(X_train, y_train)
    y_pred = pipeline.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    
    print(f"准确率: {accuracy:.4f}")
    return accuracy

# 比较两种方法
traditional_acc = traditional_approach()
pipeline_acc = pipeline_approach()

print(f"\n准确率差异: {abs(traditional_acc - pipeline_acc):.6f}")

2. 管道的优势

python
def demonstrate_pipeline_benefits():
    """演示管道的优势"""
    
    print("=== 管道优势演示 ===")
    
    # 1. 防止数据泄露
    print("1. 防止数据泄露:")
    print("   - 管道确保预处理步骤只在训练集上fit")
    print("   - 测试集只进行transform操作")
    
    # 2. 代码简洁性
    print("\n2. 代码简洁性:")
    print("   - 将多个步骤组合成一个对象")
    print("   - 减少中间变量和重复代码")
    
    # 3. 参数调优便利性
    print("\n3. 参数调优便利性:")
    print("   - 可以同时调优预处理和模型参数")
    print("   - 统一的接口进行网格搜索")
    
    # 4. 可重现性
    print("\n4. 可重现性:")
    print("   - 完整保存预处理和模型状态")
    print("   - 确保部署时的一致性")

demonstrate_pipeline_benefits()

复杂管道构建

1. 多步骤预处理管道

python
from sklearn.preprocessing import StandardScaler, LabelEncoder, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.feature_selection import SelectKBest, f_classif

# 创建混合类型数据集
def create_mixed_dataset():
    """创建包含数值和分类特征的数据集"""
    np.random.seed(42)
    
    n_samples = 1000
    
    # 数值特征
    numerical_features = np.random.randn(n_samples, 3)
    
    # 分类特征
    categorical_features = np.random.choice(['A', 'B', 'C'], size=(n_samples, 2))
    
    # 添加一些缺失值
    numerical_features[np.random.choice(n_samples, 50), 0] = np.nan
    categorical_features[np.random.choice(n_samples, 30), 1] = None
    
    # 创建目标变量
    y = (numerical_features[:, 0] + 
         (categorical_features[:, 0] == 'A').astype(int) + 
         np.random.randn(n_samples) * 0.1 > 0).astype(int)
    
    # 组合特征
    feature_names = ['num_1', 'num_2', 'num_3', 'cat_1', 'cat_2']
    
    return numerical_features, categorical_features, y, feature_names

# 创建数据
num_features, cat_features, y_mixed, feature_names = create_mixed_dataset()

def build_preprocessing_pipeline():
    """构建预处理管道"""
    
    # 数值特征预处理
    numerical_transformer = Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ])
    
    # 分类特征预处理
    categorical_transformer = Pipeline([
        ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
        ('onehot', OneHotEncoder(drop='first', sparse_output=False))
    ])
    
    # 组合预处理器
    preprocessor = ColumnTransformer([
        ('num', numerical_transformer, [0, 1, 2]),  # 数值特征列索引
        ('cat', categorical_transformer, [3, 4])     # 分类特征列索引
    ])
    
    # 完整管道
    full_pipeline = Pipeline([
        ('preprocessor', preprocessor),
        ('feature_selection', SelectKBest(f_classif, k=5)),
        ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
    ])
    
    return full_pipeline

# 构建和测试管道
preprocessing_pipeline = build_preprocessing_pipeline()

# 准备混合数据
X_mixed = np.column_stack([num_features, cat_features])
X_train_mixed, X_test_mixed, y_train_mixed, y_test_mixed = train_test_split(
    X_mixed, y_mixed, test_size=0.2, random_state=42
)

# 训练和评估
preprocessing_pipeline.fit(X_train_mixed, y_train_mixed)
y_pred_mixed = preprocessing_pipeline.predict(X_test_mixed)
accuracy_mixed = accuracy_score(y_test_mixed, y_pred_mixed)

print(f"混合数据管道准确率: {accuracy_mixed:.4f}")

# 查看管道结构
print("\n=== 管道结构 ===")
for i, (name, transformer) in enumerate(preprocessing_pipeline.steps):
    print(f"步骤 {i+1}: {name} - {type(transformer).__name__}")

2. 特征工程管道

python
from sklearn.preprocessing import PolynomialFeatures, FunctionTransformer
from sklearn.base import BaseEstimator, TransformerMixin

# 自定义特征工程器
class CustomFeatureEngineer(BaseEstimator, TransformerMixin):
    """自定义特征工程类"""
    
    def __init__(self, add_polynomial=True, add_interactions=True):
        self.add_polynomial = add_polynomial
        self.add_interactions = add_interactions
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X_new = X.copy()
        
        if self.add_polynomial:
            # 添加平方特征
            X_squared = X ** 2
            X_new = np.column_stack([X_new, X_squared])
        
        if self.add_interactions:
            # 添加交互特征
            if X.shape[1] >= 2:
                interactions = X[:, 0] * X[:, 1]
                X_new = np.column_stack([X_new, interactions.reshape(-1, 1)])
        
        return X_new

def create_feature_engineering_pipeline():
    """创建特征工程管道"""
    
    # 数学变换函数
    def log_transform(X):
        """对数变换(处理正值)"""
        return np.log1p(np.abs(X))
    
    def sqrt_transform(X):
        """平方根变换"""
        return np.sqrt(np.abs(X))
    
    # 特征工程管道
    feature_pipeline = Pipeline([
        # 基础预处理
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler()),
        
        # 数学变换
        ('log_transform', FunctionTransformer(log_transform)),
        
        # 自定义特征工程
        ('custom_features', CustomFeatureEngineer(add_polynomial=True, add_interactions=True)),
        
        # 多项式特征
        ('polynomial', PolynomialFeatures(degree=2, include_bias=False, interaction_only=False)),
        
        # 特征选择
        ('feature_selection', SelectKBest(f_classif, k=10)),
        
        # 最终标准化
        ('final_scaler', StandardScaler()),
        
        # 分类器
        ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
    ])
    
    return feature_pipeline

# 创建和测试特征工程管道
feature_eng_pipeline = create_feature_engineering_pipeline()

# 使用原始iris数据测试
feature_eng_pipeline.fit(X_train, y_train)
y_pred_fe = feature_eng_pipeline.predict(X_test)
accuracy_fe = accuracy_score(y_test, y_pred_fe)

print(f"特征工程管道准确率: {accuracy_fe:.4f}")

# 查看特征数量变化
print("\n=== 特征变化过程 ===")
X_temp = X_train.copy()
for i, (name, transformer) in enumerate(feature_eng_pipeline.steps[:-1]):  # 除了最后的分类器
    if hasattr(transformer, 'transform'):
        X_temp = transformer.fit_transform(X_temp) if i == 0 else transformer.transform(X_temp)
        print(f"步骤 {i+1} ({name}): {X_temp.shape[1]} 个特征")

管道参数调优

1. 网格搜索与管道

python
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV

def pipeline_grid_search():
    """管道参数网格搜索"""
    
    # 创建基础管道
    pipe = Pipeline([
        ('scaler', StandardScaler()),
        ('feature_selection', SelectKBest()),
        ('classifier', RandomForestClassifier(random_state=42))
    ])
    
    # 定义参数网格
    param_grid = {
        # 特征选择参数
        'feature_selection__k': [2, 3, 4, 'all'],
        
        # 分类器参数
        'classifier__n_estimators': [50, 100, 200],
        'classifier__max_depth': [3, 5, 10, None],
        'classifier__min_samples_split': [2, 5, 10]
    }
    
    # 网格搜索
    grid_search = GridSearchCV(
        pipe, param_grid, cv=5, scoring='accuracy',
        n_jobs=-1, verbose=1
    )
    
    # 训练
    print("开始网格搜索...")
    grid_search.fit(X_train, y_train)
    
    # 结果
    print(f"\n最佳参数: {grid_search.best_params_}")
    print(f"最佳交叉验证分数: {grid_search.best_score_:.4f}")
    
    # 测试集评估
    best_pipeline = grid_search.best_estimator_
    test_score = best_pipeline.score(X_test, y_test)
    print(f"测试集分数: {test_score:.4f}")
    
    return grid_search

# 执行网格搜索
grid_results = pipeline_grid_search()

2. 随机搜索优化

python
from scipy.stats import randint, uniform

def pipeline_random_search():
    """管道随机参数搜索"""
    
    # 创建管道
    pipe = Pipeline([
        ('scaler', StandardScaler()),
        ('classifier', RandomForestClassifier(random_state=42))
    ])
    
    # 定义参数分布
    param_distributions = {
        'classifier__n_estimators': randint(50, 300),
        'classifier__max_depth': randint(3, 20),
        'classifier__min_samples_split': randint(2, 20),
        'classifier__min_samples_leaf': randint(1, 10),
        'classifier__max_features': uniform(0.1, 0.9)
    }
    
    # 随机搜索
    random_search = RandomizedSearchCV(
        pipe, param_distributions, n_iter=50, cv=5,
        scoring='accuracy', n_jobs=-1, random_state=42, verbose=1
    )
    
    print("开始随机搜索...")
    random_search.fit(X_train, y_train)
    
    print(f"\n最佳参数: {random_search.best_params_}")
    print(f"最佳交叉验证分数: {random_search.best_score_:.4f}")
    
    # 测试集评估
    test_score = random_search.best_estimator_.score(X_test, y_test)
    print(f"测试集分数: {test_score:.4f}")
    
    return random_search

# 执行随机搜索
random_results = pipeline_random_search()

管道可视化和调试

1. 管道可视化

python
def visualize_pipeline(pipeline, X_sample=None):
    """可视化管道结构和数据流"""
    
    print("=== 管道结构 ===")
    for i, (name, transformer) in enumerate(pipeline.steps):
        print(f"步骤 {i+1}: {name}")
        print(f"  类型: {type(transformer).__name__}")
        
        # 显示参数
        if hasattr(transformer, 'get_params'):
            params = transformer.get_params()
            key_params = {k: v for k, v in params.items() if not k.endswith('_')}
            if key_params:
                print(f"  主要参数: {key_params}")
        
        print()
    
    # 如果提供了样本数据,显示数据变化
    if X_sample is not None:
        print("=== 数据变化过程 ===")
        X_temp = X_sample.copy()
        print(f"输入数据形状: {X_temp.shape}")
        
        for i, (name, transformer) in enumerate(pipeline.steps[:-1]):  # 除了最后的估计器
            if hasattr(transformer, 'transform'):
                if hasattr(transformer, 'fit_transform') and not hasattr(transformer, 'transform_'):
                    # 如果还没有拟合,先拟合
                    X_temp = transformer.fit_transform(X_temp)
                else:
                    X_temp = transformer.transform(X_temp)
                print(f"步骤 {i+1} ({name}) 后: {X_temp.shape}")

# 可视化管道
sample_pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('feature_selection', SelectKBest(k=3)),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])

visualize_pipeline(sample_pipeline, X_train[:5])  # 使用前5个样本

2. 管道调试工具

python
class DebugTransformer(BaseEstimator, TransformerMixin):
    """调试用的变换器,可以查看数据在管道中的变化"""
    
    def __init__(self, name="Debug", print_shape=True, print_stats=True):
        self.name = name
        self.print_shape = print_shape
        self.print_stats = print_stats
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        print(f"\n=== {self.name} ===")
        
        if self.print_shape:
            print(f"数据形状: {X.shape}")
        
        if self.print_stats:
            print(f"数据范围: [{X.min():.4f}, {X.max():.4f}]")
            print(f"数据均值: {X.mean():.4f}")
            print(f"数据标准差: {X.std():.4f}")
            
            # 检查缺失值
            if hasattr(X, 'isnull'):
                missing_count = X.isnull().sum().sum()
            else:
                missing_count = np.isnan(X).sum()
            print(f"缺失值数量: {missing_count}")
        
        return X

def create_debug_pipeline():
    """创建带调试功能的管道"""
    
    debug_pipeline = Pipeline([
        ('debug_input', DebugTransformer("输入数据")),
        ('imputer', SimpleImputer(strategy='median')),
        ('debug_after_impute', DebugTransformer("填充缺失值后")),
        ('scaler', StandardScaler()),
        ('debug_after_scale', DebugTransformer("标准化后")),
        ('feature_selection', SelectKBest(k=3)),
        ('debug_after_selection', DebugTransformer("特征选择后")),
        ('classifier', RandomForestClassifier(n_estimators=50, random_state=42))
    ])
    
    return debug_pipeline

# 测试调试管道
debug_pipe = create_debug_pipeline()

# 添加一些缺失值用于演示
X_train_debug = X_train.copy()
X_train_debug[0, 0] = np.nan
X_train_debug[1, 1] = np.nan

print("=== 调试管道执行过程 ===")
debug_pipe.fit(X_train_debug, y_train)

管道持久化和部署

1. 管道保存和加载

python
import joblib
import pickle
from datetime import datetime

def save_pipeline(pipeline, model_name="model", save_format="joblib"):
    """保存训练好的管道"""
    
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    
    if save_format == "joblib":
        filename = f"{model_name}_{timestamp}.joblib"
        joblib.dump(pipeline, filename)
    else:
        filename = f"{model_name}_{timestamp}.pkl"
        with open(filename, 'wb') as f:
            pickle.dump(pipeline, f)
    
    print(f"管道已保存为: {filename}")
    
    # 保存元数据
    metadata = {
        'model_name': model_name,
        'timestamp': timestamp,
        'pipeline_steps': [step[0] for step in pipeline.steps],
        'feature_count': getattr(pipeline, 'n_features_in_', 'unknown'),
        'save_format': save_format
    }
    
    metadata_filename = f"{model_name}_{timestamp}_metadata.json"
    import json
    with open(metadata_filename, 'w') as f:
        json.dump(metadata, f, indent=2)
    
    print(f"元数据已保存为: {metadata_filename}")
    
    return filename, metadata_filename

def load_pipeline(filename):
    """加载保存的管道"""
    
    if filename.endswith('.joblib'):
        pipeline = joblib.load(filename)
    else:
        with open(filename, 'rb') as f:
            pipeline = pickle.load(f)
    
    print(f"管道已从 {filename} 加载")
    return pipeline

# 训练并保存管道
final_pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])

final_pipeline.fit(X_train, y_train)

# 保存管道
model_file, metadata_file = save_pipeline(final_pipeline, "iris_classifier")

# 加载管道
loaded_pipeline = load_pipeline(model_file)

# 验证加载的管道
loaded_predictions = loaded_pipeline.predict(X_test)
original_predictions = final_pipeline.predict(X_test)

print(f"预测结果一致性: {np.array_equal(loaded_predictions, original_predictions)}")

2. 生产环境管道

python
class ProductionPipeline:
    """生产环境管道包装器"""
    
    def __init__(self, pipeline, feature_names=None, target_names=None):
        self.pipeline = pipeline
        self.feature_names = feature_names
        self.target_names = target_names
        self.version = "1.0"
        self.created_at = datetime.now()
    
    def predict(self, X):
        """预测方法,包含输入验证"""
        
        # 输入验证
        if not isinstance(X, (np.ndarray, pd.DataFrame)):
            raise ValueError("输入必须是numpy数组或pandas DataFrame")
        
        if hasattr(X, 'shape') and len(X.shape) == 1:
            X = X.reshape(1, -1)
        
        # 特征数量检查
        expected_features = getattr(self.pipeline, 'n_features_in_', None)
        if expected_features and X.shape[1] != expected_features:
            raise ValueError(f"期望 {expected_features} 个特征,但得到 {X.shape[1]} 个")
        
        # 预测
        try:
            predictions = self.pipeline.predict(X)
            
            # 如果有目标名称,转换预测结果
            if self.target_names:
                predictions = [self.target_names[pred] for pred in predictions]
            
            return predictions
        
        except Exception as e:
            raise RuntimeError(f"预测过程中出错: {str(e)}")
    
    def predict_proba(self, X):
        """概率预测"""
        
        if not hasattr(self.pipeline, 'predict_proba'):
            raise AttributeError("管道不支持概率预测")
        
        # 输入验证(复用predict方法的逻辑)
        if hasattr(X, 'shape') and len(X.shape) == 1:
            X = X.reshape(1, -1)
        
        try:
            probabilities = self.pipeline.predict_proba(X)
            return probabilities
        
        except Exception as e:
            raise RuntimeError(f"概率预测过程中出错: {str(e)}")
    
    def get_info(self):
        """获取模型信息"""
        
        info = {
            'version': self.version,
            'created_at': self.created_at.isoformat(),
            'pipeline_steps': [step[0] for step in self.pipeline.steps],
            'feature_names': self.feature_names,
            'target_names': self.target_names,
            'n_features': getattr(self.pipeline, 'n_features_in_', 'unknown')
        }
        
        return info

# 创建生产环境管道
production_model = ProductionPipeline(
    pipeline=final_pipeline,
    feature_names=['sepal_length', 'sepal_width', 'petal_length', 'petal_width'],
    target_names=['setosa', 'versicolor', 'virginica']
)

# 测试生产环境管道
print("=== 生产环境管道测试 ===")
print("模型信息:")
info = production_model.get_info()
for key, value in info.items():
    print(f"  {key}: {value}")

# 单样本预测
sample = X_test[0:1]
pred = production_model.predict(sample)
prob = production_model.predict_proba(sample)

print(f"\n单样本预测: {pred}")
print(f"预测概率: {prob}")

# 批量预测
batch_pred = production_model.predict(X_test[:5])
print(f"批量预测: {batch_pred}")

管道最佳实践

1. 管道设计原则

python
def pipeline_best_practices():
    """管道设计最佳实践"""
    
    practices = """
    === 管道设计最佳实践 ===
    
    1. 数据预处理原则:
       ✓ 所有预处理步骤都应该在管道内
       ✓ 避免在管道外进行数据变换
       ✓ 使用ColumnTransformer处理混合类型数据
       ✓ 确保预处理步骤的可逆性(如果需要)
    
    2. 特征工程原则:
       ✓ 将特征工程步骤集成到管道中
       ✓ 使用自定义Transformer封装复杂逻辑
       ✓ 保持特征工程的可解释性
       ✓ 避免数据泄露(不要使用未来信息)
    
    3. 模型选择原则:
       ✓ 将模型作为管道的最后一步
       ✓ 使用网格搜索优化整个管道
       ✓ 考虑模型的计算复杂度
       ✓ 保持模型的可解释性需求
    
    4. 验证和测试原则:
       ✓ 使用交叉验证评估管道性能
       ✓ 在独立测试集上验证最终性能
       ✓ 检查管道的鲁棒性
       ✓ 监控管道在生产环境中的表现
    
    5. 部署和维护原则:
       ✓ 版本控制管道和数据
       ✓ 监控模型性能衰减
       ✓ 建立模型重训练机制
       ✓ 保持管道的向后兼容性
    
    6. 性能优化原则:
       ✓ 使用并行处理(n_jobs=-1)
       ✓ 缓存中间结果(memory参数)
       ✓ 选择合适的数据类型
       ✓ 优化内存使用
    """
    
    print(practices)

pipeline_best_practices()

2. 常见错误和解决方案

python
def common_pipeline_mistakes():
    """常见管道错误和解决方案"""
    
    mistakes = """
    === 常见管道错误和解决方案 ===
    
    错误1: 在管道外预处理数据
    ❌ 错误做法:
        X_scaled = StandardScaler().fit_transform(X_train)
        model.fit(X_scaled, y_train)
    
    ✅ 正确做法:
        pipeline = Pipeline([
            ('scaler', StandardScaler()),
            ('model', LogisticRegression())
        ])
        pipeline.fit(X_train, y_train)
    
    错误2: 数据泄露
    ❌ 错误做法:
        # 在整个数据集上进行特征选择
        selector = SelectKBest(k=5)
        X_selected = selector.fit_transform(X, y)
        X_train, X_test, y_train, y_test = train_test_split(X_selected, y)
    
    ✅ 正确做法:
        X_train, X_test, y_train, y_test = train_test_split(X, y)
        pipeline = Pipeline([
            ('selector', SelectKBest(k=5)),
            ('model', LogisticRegression())
        ])
        pipeline.fit(X_train, y_train)
    
    错误3: 忘记处理分类特征
    ❌ 错误做法:
        # 直接将字符串特征传给模型
        pipeline = Pipeline([
            ('scaler', StandardScaler()),  # 会报错
            ('model', LogisticRegression())
        ])
    
    ✅ 正确做法:
        preprocessor = ColumnTransformer([
            ('num', StandardScaler(), numerical_columns),
            ('cat', OneHotEncoder(), categorical_columns)
        ])
        pipeline = Pipeline([
            ('preprocessor', preprocessor),
            ('model', LogisticRegression())
        ])
    
    错误4: 参数调优时的数据泄露
    ❌ 错误做法:
        # 在整个数据集上进行网格搜索
        grid_search = GridSearchCV(pipeline, param_grid)
        grid_search.fit(X, y)  # 数据泄露
    
    ✅ 正确做法:
        X_train, X_test, y_train, y_test = train_test_split(X, y)
        grid_search = GridSearchCV(pipeline, param_grid, cv=5)
        grid_search.fit(X_train, y_train)
        test_score = grid_search.score(X_test, y_test)
    
    错误5: 不保存预处理器状态
    ❌ 错误做法:
        # 只保存模型,不保存预处理器
        joblib.dump(model, 'model.pkl')
    
    ✅ 正确做法:
        # 保存整个管道
        joblib.dump(pipeline, 'pipeline.pkl')
    """
    
    print(mistakes)

common_pipeline_mistakes()

高级管道技巧

1. 条件管道

python
from sklearn.base import BaseEstimator, TransformerMixin

class ConditionalTransformer(BaseEstimator, TransformerMixin):
    """条件变换器:根据条件选择不同的变换"""
    
    def __init__(self, condition_func, transformer_true, transformer_false):
        self.condition_func = condition_func
        self.transformer_true = transformer_true
        self.transformer_false = transformer_false
        self.use_true_transformer = None
    
    def fit(self, X, y=None):
        # 根据条件选择变换器
        self.use_true_transformer = self.condition_func(X)
        
        if self.use_true_transformer:
            self.transformer_true.fit(X, y)
        else:
            self.transformer_false.fit(X, y)
        
        return self
    
    def transform(self, X):
        if self.use_true_transformer:
            return self.transformer_true.transform(X)
        else:
            return self.transformer_false.transform(X)

# 示例:根据数据大小选择不同的特征选择策略
def is_high_dimensional(X):
    """判断是否为高维数据"""
    return X.shape[1] > 10

# 创建条件管道
conditional_pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('conditional_selector', ConditionalTransformer(
        condition_func=is_high_dimensional,
        transformer_true=SelectKBest(k=10),  # 高维时选择前10个特征
        transformer_false=SelectKBest(k='all')  # 低维时保留所有特征
    )),
    ('classifier', RandomForestClassifier(random_state=42))
])

print("=== 条件管道测试 ===")
conditional_pipeline.fit(X_train, y_train)
cond_pred = conditional_pipeline.predict(X_test)
cond_accuracy = accuracy_score(y_test, cond_pred)
print(f"条件管道准确率: {cond_accuracy:.4f}")

2. 并行管道

python
from sklearn.pipeline import FeatureUnion

def create_parallel_pipeline():
    """创建并行特征处理管道"""
    
    # 并行特征处理
    feature_union = FeatureUnion([
        ('statistical_features', Pipeline([
            ('selector1', SelectKBest(k=2)),
            ('scaler1', StandardScaler())
        ])),
        ('polynomial_features', Pipeline([
            ('poly', PolynomialFeatures(degree=2, include_bias=False)),
            ('selector2', SelectKBest(k=3)),
            ('scaler2', StandardScaler())
        ]))
    ])
    
    # 完整并行管道
    parallel_pipeline = Pipeline([
        ('features', feature_union),
        ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
    ])
    
    return parallel_pipeline

# 测试并行管道
parallel_pipe = create_parallel_pipeline()
parallel_pipe.fit(X_train, y_train)
parallel_pred = parallel_pipe.predict(X_test)
parallel_accuracy = accuracy_score(y_test, parallel_pred)

print(f"并行管道准确率: {parallel_accuracy:.4f}")

# 查看特征组合后的维度
feature_union = parallel_pipe.named_steps['features']
X_transformed = feature_union.transform(X_train)
print(f"并行特征处理后的维度: {X_transformed.shape}")

总结

管道是机器学习工作流的核心工具,它提供了:

主要优势:

  1. 防止数据泄露:确保预处理只在训练集上拟合
  2. 代码简洁:将多个步骤组合成统一接口
  3. 便于调优:可以同时优化预处理和模型参数
  4. 易于部署:保存完整的处理流程
  5. 提高可重现性:确保处理步骤的一致性

最佳实践:

  • 将所有预处理步骤包含在管道中
  • 使用ColumnTransformer处理混合数据类型
  • 通过网格搜索优化整个管道
  • 保存完整管道而不仅仅是模型
  • 在生产环境中添加输入验证

高级技巧:

  • 自定义Transformer封装复杂逻辑
  • 使用FeatureUnion进行并行特征处理
  • 条件管道根据数据特征选择处理方式
  • 调试管道监控数据变化过程

下一章我们将学习超参数调优,深入了解如何系统地优化模型性能。

本站内容仅供学习和研究使用。