管道与工作流
机器学习管道(Pipeline)是将数据预处理、特征工程和模型训练步骤组合成一个统一工作流的强大工具。它不仅能提高代码的可读性和可维护性,还能防止数据泄露,确保模型的可重现性。
基础管道概念
1. 简单管道
python
import numpy as np
import pandas as pd
from sklearn.datasets import load_iris, load_boston
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score, classification_report
import matplotlib.pyplot as plt
# 加载数据
iris = load_iris()
X, y = iris.data, iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 传统方法(容易出错)
def traditional_approach():
"""传统的分步处理方法"""
print("=== 传统方法 ===")
# 步骤1: 标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test) # 注意:只能transform,不能fit
# 步骤2: 训练模型
model = LogisticRegression(random_state=42)
model.fit(X_train_scaled, y_train)
# 步骤3: 预测
y_pred = model.predict(X_test_scaled)
accuracy = accuracy_score(y_test, y_pred)
print(f"准确率: {accuracy:.4f}")
return accuracy
# 管道方法(推荐)
def pipeline_approach():
"""使用管道的方法"""
print("=== 管道方法 ===")
# 创建管道
pipeline = Pipeline([
('scaler', StandardScaler()),
('classifier', LogisticRegression(random_state=42))
])
# 一步完成训练和预测
pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"准确率: {accuracy:.4f}")
return accuracy
# 比较两种方法
traditional_acc = traditional_approach()
pipeline_acc = pipeline_approach()
print(f"\n准确率差异: {abs(traditional_acc - pipeline_acc):.6f}")2. 管道的优势
python
def demonstrate_pipeline_benefits():
"""演示管道的优势"""
print("=== 管道优势演示 ===")
# 1. 防止数据泄露
print("1. 防止数据泄露:")
print(" - 管道确保预处理步骤只在训练集上fit")
print(" - 测试集只进行transform操作")
# 2. 代码简洁性
print("\n2. 代码简洁性:")
print(" - 将多个步骤组合成一个对象")
print(" - 减少中间变量和重复代码")
# 3. 参数调优便利性
print("\n3. 参数调优便利性:")
print(" - 可以同时调优预处理和模型参数")
print(" - 统一的接口进行网格搜索")
# 4. 可重现性
print("\n4. 可重现性:")
print(" - 完整保存预处理和模型状态")
print(" - 确保部署时的一致性")
demonstrate_pipeline_benefits()复杂管道构建
1. 多步骤预处理管道
python
from sklearn.preprocessing import StandardScaler, LabelEncoder, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.feature_selection import SelectKBest, f_classif
# 创建混合类型数据集
def create_mixed_dataset():
"""创建包含数值和分类特征的数据集"""
np.random.seed(42)
n_samples = 1000
# 数值特征
numerical_features = np.random.randn(n_samples, 3)
# 分类特征
categorical_features = np.random.choice(['A', 'B', 'C'], size=(n_samples, 2))
# 添加一些缺失值
numerical_features[np.random.choice(n_samples, 50), 0] = np.nan
categorical_features[np.random.choice(n_samples, 30), 1] = None
# 创建目标变量
y = (numerical_features[:, 0] +
(categorical_features[:, 0] == 'A').astype(int) +
np.random.randn(n_samples) * 0.1 > 0).astype(int)
# 组合特征
feature_names = ['num_1', 'num_2', 'num_3', 'cat_1', 'cat_2']
return numerical_features, categorical_features, y, feature_names
# 创建数据
num_features, cat_features, y_mixed, feature_names = create_mixed_dataset()
def build_preprocessing_pipeline():
"""构建预处理管道"""
# 数值特征预处理
numerical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
# 分类特征预处理
categorical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('onehot', OneHotEncoder(drop='first', sparse_output=False))
])
# 组合预处理器
preprocessor = ColumnTransformer([
('num', numerical_transformer, [0, 1, 2]), # 数值特征列索引
('cat', categorical_transformer, [3, 4]) # 分类特征列索引
])
# 完整管道
full_pipeline = Pipeline([
('preprocessor', preprocessor),
('feature_selection', SelectKBest(f_classif, k=5)),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
return full_pipeline
# 构建和测试管道
preprocessing_pipeline = build_preprocessing_pipeline()
# 准备混合数据
X_mixed = np.column_stack([num_features, cat_features])
X_train_mixed, X_test_mixed, y_train_mixed, y_test_mixed = train_test_split(
X_mixed, y_mixed, test_size=0.2, random_state=42
)
# 训练和评估
preprocessing_pipeline.fit(X_train_mixed, y_train_mixed)
y_pred_mixed = preprocessing_pipeline.predict(X_test_mixed)
accuracy_mixed = accuracy_score(y_test_mixed, y_pred_mixed)
print(f"混合数据管道准确率: {accuracy_mixed:.4f}")
# 查看管道结构
print("\n=== 管道结构 ===")
for i, (name, transformer) in enumerate(preprocessing_pipeline.steps):
print(f"步骤 {i+1}: {name} - {type(transformer).__name__}")2. 特征工程管道
python
from sklearn.preprocessing import PolynomialFeatures, FunctionTransformer
from sklearn.base import BaseEstimator, TransformerMixin
# 自定义特征工程器
class CustomFeatureEngineer(BaseEstimator, TransformerMixin):
"""自定义特征工程类"""
def __init__(self, add_polynomial=True, add_interactions=True):
self.add_polynomial = add_polynomial
self.add_interactions = add_interactions
def fit(self, X, y=None):
return self
def transform(self, X):
X_new = X.copy()
if self.add_polynomial:
# 添加平方特征
X_squared = X ** 2
X_new = np.column_stack([X_new, X_squared])
if self.add_interactions:
# 添加交互特征
if X.shape[1] >= 2:
interactions = X[:, 0] * X[:, 1]
X_new = np.column_stack([X_new, interactions.reshape(-1, 1)])
return X_new
def create_feature_engineering_pipeline():
"""创建特征工程管道"""
# 数学变换函数
def log_transform(X):
"""对数变换(处理正值)"""
return np.log1p(np.abs(X))
def sqrt_transform(X):
"""平方根变换"""
return np.sqrt(np.abs(X))
# 特征工程管道
feature_pipeline = Pipeline([
# 基础预处理
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler()),
# 数学变换
('log_transform', FunctionTransformer(log_transform)),
# 自定义特征工程
('custom_features', CustomFeatureEngineer(add_polynomial=True, add_interactions=True)),
# 多项式特征
('polynomial', PolynomialFeatures(degree=2, include_bias=False, interaction_only=False)),
# 特征选择
('feature_selection', SelectKBest(f_classif, k=10)),
# 最终标准化
('final_scaler', StandardScaler()),
# 分类器
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
return feature_pipeline
# 创建和测试特征工程管道
feature_eng_pipeline = create_feature_engineering_pipeline()
# 使用原始iris数据测试
feature_eng_pipeline.fit(X_train, y_train)
y_pred_fe = feature_eng_pipeline.predict(X_test)
accuracy_fe = accuracy_score(y_test, y_pred_fe)
print(f"特征工程管道准确率: {accuracy_fe:.4f}")
# 查看特征数量变化
print("\n=== 特征变化过程 ===")
X_temp = X_train.copy()
for i, (name, transformer) in enumerate(feature_eng_pipeline.steps[:-1]): # 除了最后的分类器
if hasattr(transformer, 'transform'):
X_temp = transformer.fit_transform(X_temp) if i == 0 else transformer.transform(X_temp)
print(f"步骤 {i+1} ({name}): {X_temp.shape[1]} 个特征")管道参数调优
1. 网格搜索与管道
python
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
def pipeline_grid_search():
"""管道参数网格搜索"""
# 创建基础管道
pipe = Pipeline([
('scaler', StandardScaler()),
('feature_selection', SelectKBest()),
('classifier', RandomForestClassifier(random_state=42))
])
# 定义参数网格
param_grid = {
# 特征选择参数
'feature_selection__k': [2, 3, 4, 'all'],
# 分类器参数
'classifier__n_estimators': [50, 100, 200],
'classifier__max_depth': [3, 5, 10, None],
'classifier__min_samples_split': [2, 5, 10]
}
# 网格搜索
grid_search = GridSearchCV(
pipe, param_grid, cv=5, scoring='accuracy',
n_jobs=-1, verbose=1
)
# 训练
print("开始网格搜索...")
grid_search.fit(X_train, y_train)
# 结果
print(f"\n最佳参数: {grid_search.best_params_}")
print(f"最佳交叉验证分数: {grid_search.best_score_:.4f}")
# 测试集评估
best_pipeline = grid_search.best_estimator_
test_score = best_pipeline.score(X_test, y_test)
print(f"测试集分数: {test_score:.4f}")
return grid_search
# 执行网格搜索
grid_results = pipeline_grid_search()2. 随机搜索优化
python
from scipy.stats import randint, uniform
def pipeline_random_search():
"""管道随机参数搜索"""
# 创建管道
pipe = Pipeline([
('scaler', StandardScaler()),
('classifier', RandomForestClassifier(random_state=42))
])
# 定义参数分布
param_distributions = {
'classifier__n_estimators': randint(50, 300),
'classifier__max_depth': randint(3, 20),
'classifier__min_samples_split': randint(2, 20),
'classifier__min_samples_leaf': randint(1, 10),
'classifier__max_features': uniform(0.1, 0.9)
}
# 随机搜索
random_search = RandomizedSearchCV(
pipe, param_distributions, n_iter=50, cv=5,
scoring='accuracy', n_jobs=-1, random_state=42, verbose=1
)
print("开始随机搜索...")
random_search.fit(X_train, y_train)
print(f"\n最佳参数: {random_search.best_params_}")
print(f"最佳交叉验证分数: {random_search.best_score_:.4f}")
# 测试集评估
test_score = random_search.best_estimator_.score(X_test, y_test)
print(f"测试集分数: {test_score:.4f}")
return random_search
# 执行随机搜索
random_results = pipeline_random_search()管道可视化和调试
1. 管道可视化
python
def visualize_pipeline(pipeline, X_sample=None):
"""可视化管道结构和数据流"""
print("=== 管道结构 ===")
for i, (name, transformer) in enumerate(pipeline.steps):
print(f"步骤 {i+1}: {name}")
print(f" 类型: {type(transformer).__name__}")
# 显示参数
if hasattr(transformer, 'get_params'):
params = transformer.get_params()
key_params = {k: v for k, v in params.items() if not k.endswith('_')}
if key_params:
print(f" 主要参数: {key_params}")
print()
# 如果提供了样本数据,显示数据变化
if X_sample is not None:
print("=== 数据变化过程 ===")
X_temp = X_sample.copy()
print(f"输入数据形状: {X_temp.shape}")
for i, (name, transformer) in enumerate(pipeline.steps[:-1]): # 除了最后的估计器
if hasattr(transformer, 'transform'):
if hasattr(transformer, 'fit_transform') and not hasattr(transformer, 'transform_'):
# 如果还没有拟合,先拟合
X_temp = transformer.fit_transform(X_temp)
else:
X_temp = transformer.transform(X_temp)
print(f"步骤 {i+1} ({name}) 后: {X_temp.shape}")
# 可视化管道
sample_pipeline = Pipeline([
('scaler', StandardScaler()),
('feature_selection', SelectKBest(k=3)),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
visualize_pipeline(sample_pipeline, X_train[:5]) # 使用前5个样本2. 管道调试工具
python
class DebugTransformer(BaseEstimator, TransformerMixin):
"""调试用的变换器,可以查看数据在管道中的变化"""
def __init__(self, name="Debug", print_shape=True, print_stats=True):
self.name = name
self.print_shape = print_shape
self.print_stats = print_stats
def fit(self, X, y=None):
return self
def transform(self, X):
print(f"\n=== {self.name} ===")
if self.print_shape:
print(f"数据形状: {X.shape}")
if self.print_stats:
print(f"数据范围: [{X.min():.4f}, {X.max():.4f}]")
print(f"数据均值: {X.mean():.4f}")
print(f"数据标准差: {X.std():.4f}")
# 检查缺失值
if hasattr(X, 'isnull'):
missing_count = X.isnull().sum().sum()
else:
missing_count = np.isnan(X).sum()
print(f"缺失值数量: {missing_count}")
return X
def create_debug_pipeline():
"""创建带调试功能的管道"""
debug_pipeline = Pipeline([
('debug_input', DebugTransformer("输入数据")),
('imputer', SimpleImputer(strategy='median')),
('debug_after_impute', DebugTransformer("填充缺失值后")),
('scaler', StandardScaler()),
('debug_after_scale', DebugTransformer("标准化后")),
('feature_selection', SelectKBest(k=3)),
('debug_after_selection', DebugTransformer("特征选择后")),
('classifier', RandomForestClassifier(n_estimators=50, random_state=42))
])
return debug_pipeline
# 测试调试管道
debug_pipe = create_debug_pipeline()
# 添加一些缺失值用于演示
X_train_debug = X_train.copy()
X_train_debug[0, 0] = np.nan
X_train_debug[1, 1] = np.nan
print("=== 调试管道执行过程 ===")
debug_pipe.fit(X_train_debug, y_train)管道持久化和部署
1. 管道保存和加载
python
import joblib
import pickle
from datetime import datetime
def save_pipeline(pipeline, model_name="model", save_format="joblib"):
"""保存训练好的管道"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if save_format == "joblib":
filename = f"{model_name}_{timestamp}.joblib"
joblib.dump(pipeline, filename)
else:
filename = f"{model_name}_{timestamp}.pkl"
with open(filename, 'wb') as f:
pickle.dump(pipeline, f)
print(f"管道已保存为: {filename}")
# 保存元数据
metadata = {
'model_name': model_name,
'timestamp': timestamp,
'pipeline_steps': [step[0] for step in pipeline.steps],
'feature_count': getattr(pipeline, 'n_features_in_', 'unknown'),
'save_format': save_format
}
metadata_filename = f"{model_name}_{timestamp}_metadata.json"
import json
with open(metadata_filename, 'w') as f:
json.dump(metadata, f, indent=2)
print(f"元数据已保存为: {metadata_filename}")
return filename, metadata_filename
def load_pipeline(filename):
"""加载保存的管道"""
if filename.endswith('.joblib'):
pipeline = joblib.load(filename)
else:
with open(filename, 'rb') as f:
pipeline = pickle.load(f)
print(f"管道已从 {filename} 加载")
return pipeline
# 训练并保存管道
final_pipeline = Pipeline([
('scaler', StandardScaler()),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
final_pipeline.fit(X_train, y_train)
# 保存管道
model_file, metadata_file = save_pipeline(final_pipeline, "iris_classifier")
# 加载管道
loaded_pipeline = load_pipeline(model_file)
# 验证加载的管道
loaded_predictions = loaded_pipeline.predict(X_test)
original_predictions = final_pipeline.predict(X_test)
print(f"预测结果一致性: {np.array_equal(loaded_predictions, original_predictions)}")2. 生产环境管道
python
class ProductionPipeline:
"""生产环境管道包装器"""
def __init__(self, pipeline, feature_names=None, target_names=None):
self.pipeline = pipeline
self.feature_names = feature_names
self.target_names = target_names
self.version = "1.0"
self.created_at = datetime.now()
def predict(self, X):
"""预测方法,包含输入验证"""
# 输入验证
if not isinstance(X, (np.ndarray, pd.DataFrame)):
raise ValueError("输入必须是numpy数组或pandas DataFrame")
if hasattr(X, 'shape') and len(X.shape) == 1:
X = X.reshape(1, -1)
# 特征数量检查
expected_features = getattr(self.pipeline, 'n_features_in_', None)
if expected_features and X.shape[1] != expected_features:
raise ValueError(f"期望 {expected_features} 个特征,但得到 {X.shape[1]} 个")
# 预测
try:
predictions = self.pipeline.predict(X)
# 如果有目标名称,转换预测结果
if self.target_names:
predictions = [self.target_names[pred] for pred in predictions]
return predictions
except Exception as e:
raise RuntimeError(f"预测过程中出错: {str(e)}")
def predict_proba(self, X):
"""概率预测"""
if not hasattr(self.pipeline, 'predict_proba'):
raise AttributeError("管道不支持概率预测")
# 输入验证(复用predict方法的逻辑)
if hasattr(X, 'shape') and len(X.shape) == 1:
X = X.reshape(1, -1)
try:
probabilities = self.pipeline.predict_proba(X)
return probabilities
except Exception as e:
raise RuntimeError(f"概率预测过程中出错: {str(e)}")
def get_info(self):
"""获取模型信息"""
info = {
'version': self.version,
'created_at': self.created_at.isoformat(),
'pipeline_steps': [step[0] for step in self.pipeline.steps],
'feature_names': self.feature_names,
'target_names': self.target_names,
'n_features': getattr(self.pipeline, 'n_features_in_', 'unknown')
}
return info
# 创建生产环境管道
production_model = ProductionPipeline(
pipeline=final_pipeline,
feature_names=['sepal_length', 'sepal_width', 'petal_length', 'petal_width'],
target_names=['setosa', 'versicolor', 'virginica']
)
# 测试生产环境管道
print("=== 生产环境管道测试 ===")
print("模型信息:")
info = production_model.get_info()
for key, value in info.items():
print(f" {key}: {value}")
# 单样本预测
sample = X_test[0:1]
pred = production_model.predict(sample)
prob = production_model.predict_proba(sample)
print(f"\n单样本预测: {pred}")
print(f"预测概率: {prob}")
# 批量预测
batch_pred = production_model.predict(X_test[:5])
print(f"批量预测: {batch_pred}")管道最佳实践
1. 管道设计原则
python
def pipeline_best_practices():
"""管道设计最佳实践"""
practices = """
=== 管道设计最佳实践 ===
1. 数据预处理原则:
✓ 所有预处理步骤都应该在管道内
✓ 避免在管道外进行数据变换
✓ 使用ColumnTransformer处理混合类型数据
✓ 确保预处理步骤的可逆性(如果需要)
2. 特征工程原则:
✓ 将特征工程步骤集成到管道中
✓ 使用自定义Transformer封装复杂逻辑
✓ 保持特征工程的可解释性
✓ 避免数据泄露(不要使用未来信息)
3. 模型选择原则:
✓ 将模型作为管道的最后一步
✓ 使用网格搜索优化整个管道
✓ 考虑模型的计算复杂度
✓ 保持模型的可解释性需求
4. 验证和测试原则:
✓ 使用交叉验证评估管道性能
✓ 在独立测试集上验证最终性能
✓ 检查管道的鲁棒性
✓ 监控管道在生产环境中的表现
5. 部署和维护原则:
✓ 版本控制管道和数据
✓ 监控模型性能衰减
✓ 建立模型重训练机制
✓ 保持管道的向后兼容性
6. 性能优化原则:
✓ 使用并行处理(n_jobs=-1)
✓ 缓存中间结果(memory参数)
✓ 选择合适的数据类型
✓ 优化内存使用
"""
print(practices)
pipeline_best_practices()2. 常见错误和解决方案
python
def common_pipeline_mistakes():
"""常见管道错误和解决方案"""
mistakes = """
=== 常见管道错误和解决方案 ===
错误1: 在管道外预处理数据
❌ 错误做法:
X_scaled = StandardScaler().fit_transform(X_train)
model.fit(X_scaled, y_train)
✅ 正确做法:
pipeline = Pipeline([
('scaler', StandardScaler()),
('model', LogisticRegression())
])
pipeline.fit(X_train, y_train)
错误2: 数据泄露
❌ 错误做法:
# 在整个数据集上进行特征选择
selector = SelectKBest(k=5)
X_selected = selector.fit_transform(X, y)
X_train, X_test, y_train, y_test = train_test_split(X_selected, y)
✅ 正确做法:
X_train, X_test, y_train, y_test = train_test_split(X, y)
pipeline = Pipeline([
('selector', SelectKBest(k=5)),
('model', LogisticRegression())
])
pipeline.fit(X_train, y_train)
错误3: 忘记处理分类特征
❌ 错误做法:
# 直接将字符串特征传给模型
pipeline = Pipeline([
('scaler', StandardScaler()), # 会报错
('model', LogisticRegression())
])
✅ 正确做法:
preprocessor = ColumnTransformer([
('num', StandardScaler(), numerical_columns),
('cat', OneHotEncoder(), categorical_columns)
])
pipeline = Pipeline([
('preprocessor', preprocessor),
('model', LogisticRegression())
])
错误4: 参数调优时的数据泄露
❌ 错误做法:
# 在整个数据集上进行网格搜索
grid_search = GridSearchCV(pipeline, param_grid)
grid_search.fit(X, y) # 数据泄露
✅ 正确做法:
X_train, X_test, y_train, y_test = train_test_split(X, y)
grid_search = GridSearchCV(pipeline, param_grid, cv=5)
grid_search.fit(X_train, y_train)
test_score = grid_search.score(X_test, y_test)
错误5: 不保存预处理器状态
❌ 错误做法:
# 只保存模型,不保存预处理器
joblib.dump(model, 'model.pkl')
✅ 正确做法:
# 保存整个管道
joblib.dump(pipeline, 'pipeline.pkl')
"""
print(mistakes)
common_pipeline_mistakes()高级管道技巧
1. 条件管道
python
from sklearn.base import BaseEstimator, TransformerMixin
class ConditionalTransformer(BaseEstimator, TransformerMixin):
"""条件变换器:根据条件选择不同的变换"""
def __init__(self, condition_func, transformer_true, transformer_false):
self.condition_func = condition_func
self.transformer_true = transformer_true
self.transformer_false = transformer_false
self.use_true_transformer = None
def fit(self, X, y=None):
# 根据条件选择变换器
self.use_true_transformer = self.condition_func(X)
if self.use_true_transformer:
self.transformer_true.fit(X, y)
else:
self.transformer_false.fit(X, y)
return self
def transform(self, X):
if self.use_true_transformer:
return self.transformer_true.transform(X)
else:
return self.transformer_false.transform(X)
# 示例:根据数据大小选择不同的特征选择策略
def is_high_dimensional(X):
"""判断是否为高维数据"""
return X.shape[1] > 10
# 创建条件管道
conditional_pipeline = Pipeline([
('scaler', StandardScaler()),
('conditional_selector', ConditionalTransformer(
condition_func=is_high_dimensional,
transformer_true=SelectKBest(k=10), # 高维时选择前10个特征
transformer_false=SelectKBest(k='all') # 低维时保留所有特征
)),
('classifier', RandomForestClassifier(random_state=42))
])
print("=== 条件管道测试 ===")
conditional_pipeline.fit(X_train, y_train)
cond_pred = conditional_pipeline.predict(X_test)
cond_accuracy = accuracy_score(y_test, cond_pred)
print(f"条件管道准确率: {cond_accuracy:.4f}")2. 并行管道
python
from sklearn.pipeline import FeatureUnion
def create_parallel_pipeline():
"""创建并行特征处理管道"""
# 并行特征处理
feature_union = FeatureUnion([
('statistical_features', Pipeline([
('selector1', SelectKBest(k=2)),
('scaler1', StandardScaler())
])),
('polynomial_features', Pipeline([
('poly', PolynomialFeatures(degree=2, include_bias=False)),
('selector2', SelectKBest(k=3)),
('scaler2', StandardScaler())
]))
])
# 完整并行管道
parallel_pipeline = Pipeline([
('features', feature_union),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
return parallel_pipeline
# 测试并行管道
parallel_pipe = create_parallel_pipeline()
parallel_pipe.fit(X_train, y_train)
parallel_pred = parallel_pipe.predict(X_test)
parallel_accuracy = accuracy_score(y_test, parallel_pred)
print(f"并行管道准确率: {parallel_accuracy:.4f}")
# 查看特征组合后的维度
feature_union = parallel_pipe.named_steps['features']
X_transformed = feature_union.transform(X_train)
print(f"并行特征处理后的维度: {X_transformed.shape}")总结
管道是机器学习工作流的核心工具,它提供了:
主要优势:
- 防止数据泄露:确保预处理只在训练集上拟合
- 代码简洁:将多个步骤组合成统一接口
- 便于调优:可以同时优化预处理和模型参数
- 易于部署:保存完整的处理流程
- 提高可重现性:确保处理步骤的一致性
最佳实践:
- 将所有预处理步骤包含在管道中
- 使用ColumnTransformer处理混合数据类型
- 通过网格搜索优化整个管道
- 保存完整管道而不仅仅是模型
- 在生产环境中添加输入验证
高级技巧:
- 自定义Transformer封装复杂逻辑
- 使用FeatureUnion进行并行特征处理
- 条件管道根据数据特征选择处理方式
- 调试管道监控数据变化过程
下一章我们将学习超参数调优,深入了解如何系统地优化模型性能。