Skip to content

模型选择策略

在机器学习项目中,选择合适的算法是成功的关键。不同的算法适用于不同类型的问题,本章将帮助你建立系统的模型选择思维框架。

模型选择的基本原则

1. 问题类型驱动选择

python
import numpy as np
import pandas as pd
from sklearn.datasets import make_classification, make_regression, make_blobs
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

# 生成不同类型的数据集进行演示
def create_sample_datasets():
    # 分类数据集
    X_clf, y_clf = make_classification(
        n_samples=1000, n_features=2, n_redundant=0, 
        n_informative=2, n_clusters_per_class=1, random_state=42
    )
    
    # 回归数据集
    X_reg, y_reg = make_regression(
        n_samples=1000, n_features=1, noise=10, random_state=42
    )
    
    # 聚类数据集
    X_cluster, y_cluster = make_blobs(
        n_samples=300, centers=4, n_features=2, 
        random_state=42, cluster_std=0.60
    )
    
    return (X_clf, y_clf), (X_reg, y_reg), (X_cluster, y_cluster)

# 创建示例数据
(X_clf, y_clf), (X_reg, y_reg), (X_cluster, y_cluster) = create_sample_datasets()

# 可视化不同类型的问题
fig, axes = plt.subplots(1, 3, figsize=(15, 4))

# 分类问题
axes[0].scatter(X_clf[:, 0], X_clf[:, 1], c=y_clf, cmap='viridis')
axes[0].set_title('分类问题')
axes[0].set_xlabel('特征1')
axes[0].set_ylabel('特征2')

# 回归问题
axes[1].scatter(X_reg, y_reg, alpha=0.6)
axes[1].set_title('回归问题')
axes[1].set_xlabel('特征')
axes[1].set_ylabel('目标值')

# 聚类问题
axes[2].scatter(X_cluster[:, 0], X_cluster[:, 1], c=y_cluster, cmap='viridis')
axes[2].set_title('聚类问题')
axes[2].set_xlabel('特征1')
axes[2].set_ylabel('特征2')

plt.tight_layout()
plt.show()

2. 数据特征分析

python
def analyze_dataset(X, y=None, dataset_name="数据集"):
    """分析数据集特征"""
    print(f"\n=== {dataset_name} 分析 ===")
    print(f"样本数量: {X.shape[0]}")
    print(f"特征数量: {X.shape[1]}")
    
    if y is not None:
        if len(np.unique(y)) < 20:  # 可能是分类问题
            print(f"类别数量: {len(np.unique(y))}")
            print(f"类别分布: {np.bincount(y)}")
        else:  # 可能是回归问题
            print(f"目标值范围: [{y.min():.2f}, {y.max():.2f}]")
            print(f"目标值标准差: {y.std():.2f}")
    
    # 特征统计
    print(f"特征均值范围: [{X.mean(axis=0).min():.2f}, {X.mean(axis=0).max():.2f}]")
    print(f"特征标准差范围: [{X.std(axis=0).min():.2f}, {X.std(axis=0).max():.2f}]")
    
    # 缺失值检查
    if hasattr(X, 'isnull'):
        missing_count = X.isnull().sum().sum()
        print(f"缺失值数量: {missing_count}")

# 分析示例数据集
analyze_dataset(X_clf, y_clf, "分类数据集")
analyze_dataset(X_reg, y_reg, "回归数据集")
analyze_dataset(X_cluster, dataset_name="聚类数据集")

算法选择指南

分类算法选择

python
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline

def compare_classification_algorithms(X, y):
    """比较不同分类算法的性能"""
    
    # 准备数据
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    # 定义算法
    algorithms = {
        '逻辑回归': Pipeline([
            ('scaler', StandardScaler()),
            ('clf', LogisticRegression(random_state=42))
        ]),
        '决策树': DecisionTreeClassifier(random_state=42),
        '随机森林': RandomForestClassifier(n_estimators=100, random_state=42),
        '梯度提升': GradientBoostingClassifier(random_state=42),
        'SVM': Pipeline([
            ('scaler', StandardScaler()),
            ('clf', SVC(random_state=42))
        ]),
        'K近邻': Pipeline([
            ('scaler', StandardScaler()),
            ('clf', KNeighborsClassifier())
        ]),
        '朴素贝叶斯': GaussianNB()
    }
    
    # 比较性能
    results = {}
    for name, algorithm in algorithms.items():
        scores = cross_val_score(algorithm, X_train, y_train, cv=5, scoring='accuracy')
        results[name] = {
            'mean': scores.mean(),
            'std': scores.std(),
            'scores': scores
        }
    
    # 显示结果
    print("分类算法性能比较:")
    print("-" * 50)
    for name, result in sorted(results.items(), key=lambda x: x[1]['mean'], reverse=True):
        print(f"{name:12s}: {result['mean']:.4f} (+/- {result['std']*2:.4f})")
    
    return results

# 运行比较
classification_results = compare_classification_algorithms(X_clf, y_clf)

回归算法选择

python
from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.svm import SVR
from sklearn.neighbors import KNeighborsRegressor

def compare_regression_algorithms(X, y):
    """比较不同回归算法的性能"""
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    algorithms = {
        '线性回归': LinearRegression(),
        '岭回归': Ridge(alpha=1.0),
        'Lasso回归': Lasso(alpha=1.0),
        '弹性网络': ElasticNet(alpha=1.0),
        '决策树': DecisionTreeRegressor(random_state=42),
        '随机森林': RandomForestRegressor(n_estimators=100, random_state=42),
        '梯度提升': GradientBoostingRegressor(random_state=42),
        'SVR': Pipeline([
            ('scaler', StandardScaler()),
            ('reg', SVR())
        ]),
        'K近邻': Pipeline([
            ('scaler', StandardScaler()),
            ('reg', KNeighborsRegressor())
        ])
    }
    
    results = {}
    for name, algorithm in algorithms.items():
        scores = cross_val_score(algorithm, X_train, y_train, cv=5, scoring='r2')
        results[name] = {
            'mean': scores.mean(),
            'std': scores.std(),
            'scores': scores
        }
    
    print("回归算法性能比较 (R² 分数):")
    print("-" * 50)
    for name, result in sorted(results.items(), key=lambda x: x[1]['mean'], reverse=True):
        print(f"{name:12s}: {result['mean']:.4f} (+/- {result['std']*2:.4f})")
    
    return results

# 运行比较
regression_results = compare_regression_algorithms(X_reg, y_reg)

基于数据特征的选择策略

1. 数据量大小

python
def recommend_by_data_size(n_samples, n_features):
    """基于数据量推荐算法"""
    
    print(f"数据集大小: {n_samples} 样本, {n_features} 特征")
    
    if n_samples < 1000:
        print("小数据集推荐:")
        print("- 朴素贝叶斯 (快速, 适合小样本)")
        print("- K近邻 (简单, 无需训练)")
        print("- 线性模型 (避免过拟合)")
        
    elif n_samples < 10000:
        print("中等数据集推荐:")
        print("- 随机森林 (平衡性能和解释性)")
        print("- 梯度提升 (通常性能较好)")
        print("- SVM (适合中等规模数据)")
        
    else:
        print("大数据集推荐:")
        print("- 线性模型 (训练快速)")
        print("- 随机森林 (可并行训练)")
        print("- 在线学习算法")
    
    if n_features > n_samples:
        print("\n高维数据 (特征数 > 样本数):")
        print("- 正则化线性模型 (Lasso, Ridge)")
        print("- 朴素贝叶斯")
        print("- 考虑降维技术")

# 示例推荐
recommend_by_data_size(1000, 20)
recommend_by_data_size(100000, 50)
recommend_by_data_size(500, 1000)

2. 特征类型分析

python
def analyze_feature_types(X, feature_names=None):
    """分析特征类型并推荐算法"""
    
    if feature_names is None:
        feature_names = [f"特征_{i}" for i in range(X.shape[1])]
    
    # 检测数值特征的分布
    numerical_features = []
    categorical_features = []
    
    for i, name in enumerate(feature_names):
        unique_values = len(np.unique(X[:, i]))
        if unique_values < 10:  # 可能是分类特征
            categorical_features.append(name)
        else:
            numerical_features.append(name)
    
    print(f"数值特征: {len(numerical_features)} 个")
    print(f"分类特征: {len(categorical_features)} 个")
    
    # 基于特征类型推荐算法
    if len(categorical_features) > len(numerical_features):
        print("\n分类特征较多,推荐:")
        print("- 朴素贝叶斯")
        print("- 决策树")
        print("- 随机森林")
    else:
        print("\n数值特征较多,推荐:")
        print("- 线性模型")
        print("- SVM")
        print("- K近邻")
    
    return numerical_features, categorical_features

# 分析示例数据
numerical_features, categorical_features = analyze_feature_types(X_clf)

模型复杂度与性能权衡

python
from sklearn.metrics import accuracy_score, mean_squared_error
import time

def evaluate_complexity_performance(X, y, problem_type='classification'):
    """评估模型复杂度与性能的权衡"""
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    if problem_type == 'classification':
        models = {
            '朴素贝叶斯': GaussianNB(),
            '逻辑回归': LogisticRegression(random_state=42),
            '决策树': DecisionTreeClassifier(random_state=42),
            '随机森林': RandomForestClassifier(n_estimators=100, random_state=42),
            'SVM': SVC(random_state=42)
        }
        metric_func = accuracy_score
        metric_name = '准确率'
    else:
        models = {
            '线性回归': LinearRegression(),
            '岭回归': Ridge(),
            '决策树': DecisionTreeRegressor(random_state=42),
            '随机森林': RandomForestRegressor(n_estimators=100, random_state=42),
            'SVR': SVR()
        }
        metric_func = lambda y_true, y_pred: -mean_squared_error(y_true, y_pred)  # 负MSE,越大越好
        metric_name = '负MSE'
    
    results = []
    
    for name, model in models.items():
        # 训练时间
        start_time = time.time()
        model.fit(X_train, y_train)
        train_time = time.time() - start_time
        
        # 预测时间
        start_time = time.time()
        y_pred = model.predict(X_test)
        predict_time = time.time() - start_time
        
        # 性能指标
        performance = metric_func(y_test, y_pred)
        
        # 模型复杂度(参数数量的近似)
        complexity = getattr(model, 'n_features_in_', X.shape[1])
        if hasattr(model, 'coef_'):
            complexity = np.prod(model.coef_.shape)
        elif hasattr(model, 'tree_'):
            complexity = model.tree_.node_count
        elif hasattr(model, 'estimators_'):
            complexity = len(model.estimators_) * 100  # 近似值
        
        results.append({
            'model': name,
            'performance': performance,
            'train_time': train_time,
            'predict_time': predict_time,
            'complexity': complexity
        })
    
    # 显示结果
    results_df = pd.DataFrame(results)
    results_df = results_df.sort_values('performance', ascending=False)
    
    print(f"模型性能与复杂度比较 ({metric_name}):")
    print("-" * 80)
    print(f"{'模型':<12} {'性能':<10} {'训练时间':<10} {'预测时间':<10} {'复杂度':<10}")
    print("-" * 80)
    
    for _, row in results_df.iterrows():
        print(f"{row['model']:<12} {row['performance']:<10.4f} {row['train_time']:<10.4f} "
              f"{row['predict_time']:<10.4f} {row['complexity']:<10.0f}")
    
    return results_df

# 评估分类问题
print("=== 分类问题评估 ===")
clf_results = evaluate_complexity_performance(X_clf, y_clf, 'classification')

print("\n=== 回归问题评估 ===")
reg_results = evaluate_complexity_performance(X_reg, y_reg, 'regression')

集成学习策略

python
from sklearn.ensemble import VotingClassifier, VotingRegressor, StackingClassifier
from sklearn.model_selection import cross_val_score

def create_ensemble_models(X, y, problem_type='classification'):
    """创建集成模型"""
    
    if problem_type == 'classification':
        # 基础分类器
        base_models = [
            ('lr', LogisticRegression(random_state=42)),
            ('rf', RandomForestClassifier(n_estimators=100, random_state=42)),
            ('svm', SVC(probability=True, random_state=42))
        ]
        
        # 投票集成
        voting_clf = VotingClassifier(
            estimators=base_models,
            voting='soft'  # 使用概率投票
        )
        
        # 堆叠集成
        stacking_clf = StackingClassifier(
            estimators=base_models,
            final_estimator=LogisticRegression(),
            cv=5
        )
        
        models = {
            '投票集成': voting_clf,
            '堆叠集成': stacking_clf
        }
        
        # 添加基础模型进行比较
        for name, model in base_models:
            models[f'基础模型_{name}'] = model
            
        scoring = 'accuracy'
        
    else:
        # 基础回归器
        base_models = [
            ('lr', LinearRegression()),
            ('rf', RandomForestRegressor(n_estimators=100, random_state=42)),
            ('svr', SVR())
        ]
        
        # 投票集成
        voting_reg = VotingRegressor(estimators=base_models)
        
        models = {
            '投票集成': voting_reg
        }
        
        # 添加基础模型进行比较
        for name, model in base_models:
            models[f'基础模型_{name}'] = model
            
        scoring = 'r2'
    
    # 评估所有模型
    results = {}
    for name, model in models.items():
        scores = cross_val_score(model, X, y, cv=5, scoring=scoring)
        results[name] = {
            'mean': scores.mean(),
            'std': scores.std()
        }
    
    print(f"集成学习效果比较 ({scoring}):")
    print("-" * 50)
    for name, result in sorted(results.items(), key=lambda x: x[1]['mean'], reverse=True):
        print(f"{name:<15}: {result['mean']:.4f} (+/- {result['std']*2:.4f})")
    
    return results

# 测试集成学习
print("=== 分类集成学习 ===")
clf_ensemble_results = create_ensemble_models(X_clf, y_clf, 'classification')

print("\n=== 回归集成学习 ===")
reg_ensemble_results = create_ensemble_models(X_reg, y_reg, 'regression')

模型选择决策树

python
def model_selection_guide():
    """模型选择决策指南"""
    
    guide = """
    模型选择决策树:
    
    1. 问题类型?
       ├── 分类问题
       │   ├── 样本数 < 1000? → 朴素贝叶斯, K近邻
       │   ├── 需要概率输出? → 逻辑回归, 随机森林
       │   ├── 需要解释性? → 决策树, 逻辑回归
       │   └── 追求最高性能? → 随机森林, 梯度提升, 集成方法

       ├── 回归问题
       │   ├── 线性关系? → 线性回归, 岭回归
       │   ├── 特征选择需求? → Lasso回归
       │   ├── 非线性关系? → 随机森林, 梯度提升
       │   └── 高维数据? → 正则化线性模型

       └── 聚类问题
           ├── 知道聚类数? → K-Means
           ├── 不规则形状? → DBSCAN
           └── 层次结构? → 层次聚类
    
    2. 数据特征?
       ├── 高维稀疏? → 线性模型, 朴素贝叶斯
       ├── 混合特征类型? → 决策树, 随机森林
       ├── 大量缺失值? → 随机森林, 梯度提升
       └── 噪声较多? → 集成方法
    
    3. 性能要求?
       ├── 训练速度优先? → 朴素贝叶斯, 线性模型
       ├── 预测速度优先? → 线性模型, K近邻
       ├── 内存限制? → 线性模型, 朴素贝叶斯
       └── 最高准确率? → 集成方法, 深度学习
    
    4. 解释性要求?
       ├── 高解释性? → 线性模型, 决策树
       ├── 中等解释性? → 随机森林 (特征重要性)
       └── 无解释性要求? → SVM, 集成方法
    """
    
    print(guide)

# 显示决策指南
model_selection_guide()

自动化模型选择

python
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import classification_report, regression

def auto_model_selection(X, y, problem_type='auto'):
    """自动模型选择和调优"""
    
    # 自动检测问题类型
    if problem_type == 'auto':
        if len(np.unique(y)) < 20 and y.dtype in ['int64', 'int32', 'object']:
            problem_type = 'classification'
        else:
            problem_type = 'regression'
    
    print(f"检测到问题类型: {problem_type}")
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    if problem_type == 'classification':
        # 分类算法和参数网格
        models_params = {
            'RandomForest': {
                'model': RandomForestClassifier(random_state=42),
                'params': {
                    'n_estimators': [50, 100, 200],
                    'max_depth': [5, 10, None]
                }
            },
            'GradientBoosting': {
                'model': GradientBoostingClassifier(random_state=42),
                'params': {
                    'n_estimators': [50, 100],
                    'learning_rate': [0.1, 0.2]
                }
            },
            'SVM': {
                'model': Pipeline([
                    ('scaler', StandardScaler()),
                    ('svm', SVC(random_state=42))
                ]),
                'params': {
                    'svm__C': [0.1, 1, 10],
                    'svm__kernel': ['rbf', 'linear']
                }
            }
        }
        scoring = 'accuracy'
        
    else:
        # 回归算法和参数网格
        models_params = {
            'RandomForest': {
                'model': RandomForestRegressor(random_state=42),
                'params': {
                    'n_estimators': [50, 100, 200],
                    'max_depth': [5, 10, None]
                }
            },
            'GradientBoosting': {
                'model': GradientBoostingRegressor(random_state=42),
                'params': {
                    'n_estimators': [50, 100],
                    'learning_rate': [0.1, 0.2]
                }
            },
            'Ridge': {
                'model': Ridge(),
                'params': {
                    'alpha': [0.1, 1.0, 10.0]
                }
            }
        }
        scoring = 'r2'
    
    # 自动搜索最佳模型
    best_score = -np.inf
    best_model = None
    best_name = None
    
    results = {}
    
    for name, config in models_params.items():
        print(f"\n正在测试 {name}...")
        
        grid_search = GridSearchCV(
            config['model'],
            config['params'],
            cv=5,
            scoring=scoring,
            n_jobs=-1
        )
        
        grid_search.fit(X_train, y_train)
        
        if grid_search.best_score_ > best_score:
            best_score = grid_search.best_score_
            best_model = grid_search.best_estimator_
            best_name = name
        
        results[name] = {
            'best_score': grid_search.best_score_,
            'best_params': grid_search.best_params_,
            'model': grid_search.best_estimator_
        }
    
    # 显示结果
    print(f"\n=== 自动模型选择结果 ===")
    print(f"最佳模型: {best_name}")
    print(f"最佳交叉验证得分: {best_score:.4f}")
    print(f"最佳参数: {results[best_name]['best_params']}")
    
    # 在测试集上评估
    test_score = best_model.score(X_test, y_test)
    print(f"测试集得分: {test_score:.4f}")
    
    return best_model, results

# 自动选择最佳模型
best_clf_model, clf_auto_results = auto_model_selection(X_clf, y_clf)

总结

模型选择是一个系统性的过程,需要考虑多个因素:

  1. 问题类型:分类、回归还是聚类
  2. 数据特征:样本数量、特征数量、数据类型
  3. 性能要求:准确率、速度、内存使用
  4. 解释性需求:是否需要理解模型决策过程
  5. 资源限制:计算时间、存储空间

选择建议

  • 从简单模型开始(线性模型、朴素贝叶斯)
  • 逐步尝试复杂模型(随机森林、梯度提升)
  • 使用交叉验证评估性能
  • 考虑集成方法提升性能
  • 根据实际需求平衡性能和复杂度

下一章我们将学习性能指标详解,深入了解如何评估和比较不同模型的性能。

本站内容仅供学习和研究使用。