Skip to content

第14章:交叉验证

交叉验证是机器学习中评估模型性能的重要技术。它通过多次分割数据来获得更可靠的性能估计,帮助我们选择最佳模型和参数。本章将详细介绍各种交叉验证方法及其应用。

14.1 什么是交叉验证?

交叉验证(Cross-Validation)是一种统计方法,用于评估机器学习模型的泛化能力。它通过将数据分成多个子集,轮流使用不同子集作为训练集和验证集来评估模型性能。

14.1.1 为什么需要交叉验证?

  • 避免过拟合:单次数据分割可能导致过于乐观的性能估计
  • 充分利用数据:所有数据都会被用于训练和验证
  • 获得稳定估计:多次验证的平均结果更可靠
  • 模型选择:比较不同模型和参数的性能

14.1.2 交叉验证的优势

  • 更可靠的性能估计:减少随机性的影响
  • 更好的模型选择:基于多次验证的结果
  • 数据利用率高:不浪费数据用于验证
  • 检测过拟合:训练和验证性能的差异

14.1.3 交叉验证的类型

  • K折交叉验证:最常用的方法
  • 留一交叉验证:每次留一个样本验证
  • 分层交叉验证:保持类别比例
  • 时间序列交叉验证:考虑时间顺序
  • 组交叉验证:考虑数据分组

14.2 准备环境和数据

python
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.datasets import load_iris, load_wine, load_breast_cancer, make_classification
from sklearn.model_selection import (
    cross_val_score, cross_validate, KFold, StratifiedKFold, 
    LeaveOneOut, LeavePOut, ShuffleSplit, StratifiedShuffleSplit,
    TimeSeriesSplit, GroupKFold, train_test_split, GridSearchCV,
    learning_curve, validation_curve
)
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.pipeline import Pipeline
import warnings
warnings.filterwarnings('ignore')

# 设置随机种子
np.random.seed(42)

# 设置图形样式
plt.style.use('seaborn-v0_8')
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

14.3 K折交叉验证

14.3.1 基本K折交叉验证

python
def demonstrate_k_fold_cv():
    """演示K折交叉验证的基本原理"""
    
    print("K折交叉验证基本原理:")
    print("=" * 25)
    
    # 加载鸢尾花数据集
    iris = load_iris()
    X, y = iris.data, iris.target
    
    print(f"数据集大小: {X.shape}")
    print(f"类别分布: {np.bincount(y)}")
    
    # 创建分类器
    clf = LogisticRegression(random_state=42, max_iter=1000)
    
    # 不同的K值
    k_values = [3, 5, 10]
    
    print(f"\n不同K值的交叉验证结果:")
    print("K值\t平均准确率\t标准差\t\t各折得分")
    print("-" * 60)
    
    fig, axes = plt.subplots(2, 2, figsize=(15, 12))
    fig.suptitle('K折交叉验证演示', fontsize=16)
    
    for i, k in enumerate(k_values):
        # K折交叉验证
        kfold = KFold(n_splits=k, shuffle=True, random_state=42)
        cv_scores = cross_val_score(clf, X, y, cv=kfold, scoring='accuracy')
        
        mean_score = np.mean(cv_scores)
        std_score = np.std(cv_scores)
        
        print(f"{k}\t{mean_score:.4f}\t\t{std_score:.4f}\t\t{cv_scores}")
        
        # 可视化每折的得分
        if i < 3:
            row = i // 2
            col = i % 2
            
            axes[row, col].bar(range(1, k+1), cv_scores, alpha=0.7, color='skyblue')
            axes[row, col].axhline(y=mean_score, color='red', linestyle='--', 
                                  label=f'平均值: {mean_score:.3f}')
            axes[row, col].set_title(f'{k}折交叉验证')
            axes[row, col].set_xlabel('折数')
            axes[row, col].set_ylabel('准确率')
            axes[row, col].legend()
            axes[row, col].grid(True, alpha=0.3)
            axes[row, col].set_ylim(0.8, 1.0)
    
    # 可视化数据分割过程
    axes[1, 1].remove()
    ax_split = fig.add_subplot(2, 2, 4)
    
    # 演示5折交叉验证的分割
    kfold_demo = KFold(n_splits=5, shuffle=True, random_state=42)
    
    fold_colors = ['red', 'blue', 'green', 'orange', 'purple']
    y_pos = 0
    
    for fold, (train_idx, val_idx) in enumerate(kfold_demo.split(X)):
        # 绘制训练集
        ax_split.barh(y_pos, len(train_idx), left=0, height=0.8, 
                     color=fold_colors[fold], alpha=0.3, label=f'折{fold+1}训练集' if fold == 0 else "")
        
        # 绘制验证集
        ax_split.barh(y_pos, len(val_idx), left=len(train_idx), height=0.8, 
                     color=fold_colors[fold], alpha=0.8, label=f'折{fold+1}验证集' if fold == 0 else "")
        
        y_pos += 1
    
    ax_split.set_title('5折交叉验证数据分割')
    ax_split.set_xlabel('样本索引')
    ax_split.set_ylabel('折数')
    ax_split.set_yticks(range(5))
    ax_split.set_yticklabels([f'折{i+1}' for i in range(5)])
    
    plt.tight_layout()
    plt.show()
    
    return cv_scores

cv_scores = demonstrate_k_fold_cv()
```### 
14.3.2 分层K折交叉验证

```python
def stratified_k_fold_demo():
    """演示分层K折交叉验证"""
    
    print("分层K折交叉验证:")
    print("保持每折中各类别的比例与原数据集一致")
    
    # 创建不平衡数据集
    X_imbalanced, y_imbalanced = make_classification(
        n_samples=1000, n_features=20, n_informative=10,
        n_classes=3, weights=[0.6, 0.3, 0.1],  # 不平衡类别
        random_state=42
    )
    
    print(f"\n不平衡数据集:")
    print(f"总样本数: {len(X_imbalanced)}")
    unique, counts = np.unique(y_imbalanced, return_counts=True)
    for cls, count in zip(unique, counts):
        print(f"类别 {cls}: {count} 样本 ({count/len(y_imbalanced)*100:.1f}%)")
    
    # 比较普通K折和分层K折
    k = 5
    
    # 普通K折
    kfold = KFold(n_splits=k, shuffle=True, random_state=42)
    
    # 分层K折
    stratified_kfold = StratifiedKFold(n_splits=k, shuffle=True, random_state=42)
    
    # 分析每折的类别分布
    print(f"\n{k}折交叉验证类别分布比较:")
    print("方法\t\t折数\t类别0\t类别1\t类别2")
    print("-" * 50)
    
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    fig.suptitle('普通K折 vs 分层K折交叉验证', fontsize=16)
    
    # 普通K折分布
    fold_distributions_normal = []
    for fold, (train_idx, val_idx) in enumerate(kfold.split(X_imbalanced, y_imbalanced)):
        val_y = y_imbalanced[val_idx]
        unique_val, counts_val = np.unique(val_y, return_counts=True)
        
        # 确保所有类别都有计数
        distribution = np.zeros(3)
        for cls, count in zip(unique_val, counts_val):
            distribution[cls] = count
        
        fold_distributions_normal.append(distribution)
        print(f"普通K折\t{fold+1}\t{distribution[0]:.0f}\t{distribution[1]:.0f}\t{distribution[2]:.0f}")
    
    # 分层K折分布
    fold_distributions_stratified = []
    for fold, (train_idx, val_idx) in enumerate(stratified_kfold.split(X_imbalanced, y_imbalanced)):
        val_y = y_imbalanced[val_idx]
        unique_val, counts_val = np.unique(val_y, return_counts=True)
        
        distribution = np.zeros(3)
        for cls, count in zip(unique_val, counts_val):
            distribution[cls] = count
        
        fold_distributions_stratified.append(distribution)
        print(f"分层K折\t{fold+1}\t{distribution[0]:.0f}\t{distribution[1]:.0f}\t{distribution[2]:.0f}")
    
    # 可视化类别分布
    fold_distributions_normal = np.array(fold_distributions_normal)
    fold_distributions_stratified = np.array(fold_distributions_stratified)
    
    # 普通K折分布
    x = np.arange(k)
    width = 0.25
    
    axes[0, 0].bar(x - width, fold_distributions_normal[:, 0], width, label='类别0', alpha=0.7)
    axes[0, 0].bar(x, fold_distributions_normal[:, 1], width, label='类别1', alpha=0.7)
    axes[0, 0].bar(x + width, fold_distributions_normal[:, 2], width, label='类别2', alpha=0.7)
    axes[0, 0].set_title('普通K折 - 各折类别分布')
    axes[0, 0].set_xlabel('折数')
    axes[0, 0].set_ylabel('样本数')
    axes[0, 0].set_xticks(x)
    axes[0, 0].set_xticklabels([f'折{i+1}' for i in range(k)])
    axes[0, 0].legend()
    axes[0, 0].grid(True, alpha=0.3)
    
    # 分层K折分布
    axes[0, 1].bar(x - width, fold_distributions_stratified[:, 0], width, label='类别0', alpha=0.7)
    axes[0, 1].bar(x, fold_distributions_stratified[:, 1], width, label='类别1', alpha=0.7)
    axes[0, 1].bar(x + width, fold_distributions_stratified[:, 2], width, label='类别2', alpha=0.7)
    axes[0, 1].set_title('分层K折 - 各折类别分布')
    axes[0, 1].set_xlabel('折数')
    axes[0, 1].set_ylabel('样本数')
    axes[0, 1].set_xticks(x)
    axes[0, 1].set_xticklabels([f'折{i+1}' for i in range(k)])
    axes[0, 1].legend()
    axes[0, 1].grid(True, alpha=0.3)
    
    # 性能比较
    clf = RandomForestClassifier(n_estimators=100, random_state=42)
    
    # 普通K折性能
    cv_scores_normal = cross_val_score(clf, X_imbalanced, y_imbalanced, 
                                      cv=kfold, scoring='accuracy')
    
    # 分层K折性能
    cv_scores_stratified = cross_val_score(clf, X_imbalanced, y_imbalanced, 
                                          cv=stratified_kfold, scoring='accuracy')
    
    print(f"\n性能比较:")
    print(f"普通K折 - 平均准确率: {np.mean(cv_scores_normal):.4f} ± {np.std(cv_scores_normal):.4f}")
    print(f"分层K折 - 平均准确率: {np.mean(cv_scores_stratified):.4f} ± {np.std(cv_scores_stratified):.4f}")
    
    # 可视化性能比较
    axes[1, 0].bar(range(1, k+1), cv_scores_normal, alpha=0.7, color='orange', label='普通K折')
    axes[1, 0].axhline(y=np.mean(cv_scores_normal), color='red', linestyle='--', 
                      label=f'平均: {np.mean(cv_scores_normal):.3f}')
    axes[1, 0].set_title('普通K折性能')
    axes[1, 0].set_xlabel('折数')
    axes[1, 0].set_ylabel('准确率')
    axes[1, 0].legend()
    axes[1, 0].grid(True, alpha=0.3)
    axes[1, 0].set_ylim(0.7, 1.0)
    
    axes[1, 1].bar(range(1, k+1), cv_scores_stratified, alpha=0.7, color='green', label='分层K折')
    axes[1, 1].axhline(y=np.mean(cv_scores_stratified), color='red', linestyle='--', 
                      label=f'平均: {np.mean(cv_scores_stratified):.3f}')
    axes[1, 1].set_title('分层K折性能')
    axes[1, 1].set_xlabel('折数')
    axes[1, 1].set_ylabel('准确率')
    axes[1, 1].legend()
    axes[1, 1].grid(True, alpha=0.3)
    axes[1, 1].set_ylim(0.7, 1.0)
    
    plt.tight_layout()
    plt.show()
    
    return cv_scores_normal, cv_scores_stratified

cv_scores_normal, cv_scores_stratified = stratified_k_fold_demo()

14.4 其他交叉验证方法

14.4.1 留一交叉验证和留P交叉验证

python
def leave_one_out_demo():
    """演示留一交叉验证和留P交叉验证"""
    
    print("留一交叉验证 (Leave-One-Out) 和留P交叉验证:")
    print("=" * 45)
    
    # 使用小数据集进行演示
    iris = load_iris()
    X_small = iris.data[:50]  # 只使用前50个样本
    y_small = iris.target[:50]
    
    print(f"小数据集大小: {X_small.shape}")
    
    # 不同的交叉验证方法
    cv_methods = {
        '5折交叉验证': KFold(n_splits=5, shuffle=True, random_state=42),
        '留一交叉验证': LeaveOneOut(),
        '留2交叉验证': LeavePOut(p=2),
        '留5交叉验证': LeavePOut(p=5)
    }
    
    clf = LogisticRegression(random_state=42, max_iter=1000)
    
    results = {}
    
    print(f"\n不同交叉验证方法的比较:")
    print("方法\t\t\t折数\t平均准确率\t标准差\t\t计算时间")
    print("-" * 70)
    
    import time
    
    for name, cv_method in cv_methods.items():
        start_time = time.time()
        
        # 计算交叉验证得分
        cv_scores = cross_val_score(clf, X_small, y_small, cv=cv_method, scoring='accuracy')
        
        end_time = time.time()
        
        mean_score = np.mean(cv_scores)
        std_score = np.std(cv_scores)
        n_splits = len(cv_scores)
        compute_time = end_time - start_time
        
        results[name] = {
            'scores': cv_scores,
            'mean': mean_score,
            'std': std_score,
            'n_splits': n_splits,
            'time': compute_time
        }
        
        print(f"{name}\t{n_splits}\t{mean_score:.4f}\t\t{std_score:.4f}\t\t{compute_time:.4f}s")
    
    # 可视化结果
    fig, axes = plt.subplots(2, 2, figsize=(15, 12))
    fig.suptitle('不同交叉验证方法比较', fontsize=16)
    
    # 折数比较
    methods = list(results.keys())
    n_splits_list = [results[method]['n_splits'] for method in methods]
    
    axes[0, 0].bar(methods, n_splits_list, alpha=0.7, color='skyblue')
    axes[0, 0].set_title('交叉验证折数')
    axes[0, 0].set_ylabel('折数')
    axes[0, 0].tick_params(axis='x', rotation=45)
    axes[0, 0].grid(True, alpha=0.3)
    
    # 计算时间比较
    times = [results[method]['time'] for method in methods]
    
    axes[0, 1].bar(methods, times, alpha=0.7, color='lightcoral')
    axes[0, 1].set_title('计算时间')
    axes[0, 1].set_ylabel('时间 (秒)')
    axes[0, 1].tick_params(axis='x', rotation=45)
    axes[0, 1].grid(True, alpha=0.3)
    
    # 准确率比较
    means = [results[method]['mean'] for method in methods]
    stds = [results[method]['std'] for method in methods]
    
    axes[1, 0].bar(methods, means, yerr=stds, alpha=0.7, color='lightgreen', capsize=5)
    axes[1, 0].set_title('平均准确率 (±标准差)')
    axes[1, 0].set_ylabel('准确率')
    axes[1, 0].tick_params(axis='x', rotation=45)
    axes[1, 0].grid(True, alpha=0.3)
    axes[1, 0].set_ylim(0.8, 1.0)
    
    # 标准差比较
    axes[1, 1].bar(methods, stds, alpha=0.7, color='gold')
    axes[1, 1].set_title('准确率标准差')
    axes[1, 1].set_ylabel('标准差')
    axes[1, 1].tick_params(axis='x', rotation=45)
    axes[1, 1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    # 分析留一交叉验证的详细结果
    loo_scores = results['留一交叉验证']['scores']
    print(f"\n留一交叉验证详细分析:")
    print(f"  总样本数: {len(X_small)}")
    print(f"  验证次数: {len(loo_scores)}")
    print(f"  正确预测: {np.sum(loo_scores)}")
    print(f"  错误预测: {len(loo_scores) - np.sum(loo_scores)}")
    print(f"  准确率: {np.mean(loo_scores):.4f}")
    
    return results

loo_results = leave_one_out_demo()

14.4.2 随机分割交叉验证

python
def shuffle_split_demo():
    """演示随机分割交叉验证"""
    
    print("随机分割交叉验证:")
    print("每次随机选择训练集和验证集,可以控制分割比例和次数")
    
    # 加载数据
    wine = load_wine()
    X, y = wine.data, wine.target
    
    print(f"数据集大小: {X.shape}")
    
    # 不同的随机分割策略
    shuffle_methods = {
        '随机分割 (80/20)': ShuffleSplit(n_splits=10, test_size=0.2, random_state=42),
        '随机分割 (70/30)': ShuffleSplit(n_splits=10, test_size=0.3, random_state=42),
        '分层随机分割': StratifiedShuffleSplit(n_splits=10, test_size=0.2, random_state=42),
        '5折交叉验证': StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    }
    
    clf = RandomForestClassifier(n_estimators=100, random_state=42)
    
    results = {}
    
    print(f"\n不同随机分割方法的比较:")
    print("方法\t\t\t分割次数\t平均准确率\t标准差")
    print("-" * 55)
    
    for name, cv_method in shuffle_methods.items():
        cv_scores = cross_val_score(clf, X, y, cv=cv_method, scoring='accuracy')
        
        mean_score = np.mean(cv_scores)
        std_score = np.std(cv_scores)
        n_splits = len(cv_scores)
        
        results[name] = {
            'scores': cv_scores,
            'mean': mean_score,
            'std': std_score,
            'n_splits': n_splits
        }
        
        print(f"{name}\t{n_splits}\t\t{mean_score:.4f}\t\t{std_score:.4f}")
    
    # 可视化结果
    fig, axes = plt.subplots(2, 2, figsize=(15, 12))
    fig.suptitle('随机分割交叉验证比较', fontsize=16)
    
    # 各方法的得分分布
    for i, (name, result) in enumerate(results.items()):
        row = i // 2
        col = i % 2
        
        scores = result['scores']
        mean_score = result['mean']
        
        axes[row, col].hist(scores, bins=10, alpha=0.7, color='skyblue', edgecolor='black')
        axes[row, col].axvline(x=mean_score, color='red', linestyle='--', 
                              label=f'平均: {mean_score:.3f}')
        axes[row, col].set_title(name)
        axes[row, col].set_xlabel('准确率')
        axes[row, col].set_ylabel('频次')
        axes[row, col].legend()
        axes[row, col].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    # 分析随机分割的稳定性
    shuffle_split = ShuffleSplit(n_splits=50, test_size=0.2, random_state=42)
    stability_scores = cross_val_score(clf, X, y, cv=shuffle_split, scoring='accuracy')
    
    print(f"\n随机分割稳定性分析 (50次分割):")
    print(f"  平均准确率: {np.mean(stability_scores):.4f}")
    print(f"  标准差: {np.std(stability_scores):.4f}")
    print(f"  最小值: {np.min(stability_scores):.4f}")
    print(f"  最大值: {np.max(stability_scores):.4f}")
    print(f"  95%置信区间: [{np.percentile(stability_scores, 2.5):.4f}, {np.percentile(stability_scores, 97.5):.4f}]")
    
    # 可视化稳定性
    plt.figure(figsize=(12, 5))
    
    plt.subplot(1, 2, 1)
    plt.plot(range(1, 51), stability_scores, 'b-', alpha=0.7, linewidth=1)
    plt.axhline(y=np.mean(stability_scores), color='red', linestyle='--', 
               label=f'平均值: {np.mean(stability_scores):.3f}')
    plt.fill_between(range(1, 51), 
                    np.mean(stability_scores) - np.std(stability_scores),
                    np.mean(stability_scores) + np.std(stability_scores),
                    alpha=0.2, color='red', label='±1标准差')
    plt.title('随机分割准确率变化')
    plt.xlabel('分割次数')
    plt.ylabel('准确率')
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    plt.subplot(1, 2, 2)
    plt.hist(stability_scores, bins=15, alpha=0.7, color='lightgreen', edgecolor='black')
    plt.axvline(x=np.mean(stability_scores), color='red', linestyle='--', 
               label=f'平均值: {np.mean(stability_scores):.3f}')
    plt.title('准确率分布')
    plt.xlabel('准确率')
    plt.ylabel('频次')
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    return results, stability_scores

shuffle_results, stability_scores = shuffle_split_demo()
```### 1
4.4.3 时间序列交叉验证

```python
def time_series_cv_demo():
    """演示时间序列交叉验证"""
    
    print("时间序列交叉验证:")
    print("考虑时间顺序,避免使用未来数据预测过去")
    
    # 创建模拟时间序列数据
    np.random.seed(42)
    n_samples = 200
    
    # 生成时间序列特征
    time_index = np.arange(n_samples)
    trend = 0.01 * time_index
    seasonal = 2 * np.sin(2 * np.pi * time_index / 50)
    noise = np.random.normal(0, 0.5, n_samples)
    
    # 目标变量:基于时间序列特征
    y_ts = trend + seasonal + noise
    
    # 特征:滞后值和移动平均
    X_ts = np.column_stack([
        np.roll(y_ts, 1),  # 滞后1期
        np.roll(y_ts, 2),  # 滞后2期
        np.roll(y_ts, 3),  # 滞后3期
        np.convolve(y_ts, np.ones(5)/5, mode='same'),  # 5期移动平均
        time_index,  # 时间趋势
        np.sin(2 * np.pi * time_index / 50),  # 季节性特征
        np.cos(2 * np.pi * time_index / 50)
    ])
    
    # 移除前几个样本(由于滞后)
    X_ts = X_ts[5:]
    y_ts = y_ts[5:]
    time_index = time_index[5:]
    
    print(f"时间序列数据形状: {X_ts.shape}")
    
    # 不同的交叉验证方法
    cv_methods = {
        '时间序列CV': TimeSeriesSplit(n_splits=5),
        '普通K折CV': KFold(n_splits=5, shuffle=True, random_state=42),
        '随机分割CV': ShuffleSplit(n_splits=5, test_size=0.2, random_state=42)
    }
    
    from sklearn.linear_model import LinearRegression
    from sklearn.metrics import mean_squared_error, r2_score
    
    regressor = LinearRegression()
    
    results = {}
    
    print(f"\n时间序列交叉验证结果:")
    print("方法\t\t\tR²得分\t\tRMSE")
    print("-" * 40)
    
    fig, axes = plt.subplots(3, 2, figsize=(15, 18))
    fig.suptitle('时间序列交叉验证比较', fontsize=16)
    
    # 原始时间序列
    axes[0, 0].plot(time_index, y_ts, 'b-', linewidth=1, alpha=0.7)
    axes[0, 0].set_title('原始时间序列')
    axes[0, 0].set_xlabel('时间')
    axes[0, 0].set_ylabel('值')
    axes[0, 0].grid(True, alpha=0.3)
    
    # 特征相关性
    feature_names = ['滞后1期', '滞后2期', '滞后3期', '移动平均', '时间趋势', 'sin', 'cos']
    correlation_matrix = np.corrcoef(X_ts.T)
    
    im = axes[0, 1].imshow(correlation_matrix, cmap='coolwarm', vmin=-1, vmax=1)
    axes[0, 1].set_title('特征相关性矩阵')
    axes[0, 1].set_xticks(range(len(feature_names)))
    axes[0, 1].set_yticks(range(len(feature_names)))
    axes[0, 1].set_xticklabels(feature_names, rotation=45)
    axes[0, 1].set_yticklabels(feature_names)
    plt.colorbar(im, ax=axes[0, 1])
    
    for i, (name, cv_method) in enumerate(cv_methods.items()):
        # 交叉验证
        cv_scores_r2 = cross_val_score(regressor, X_ts, y_ts, cv=cv_method, scoring='r2')
        cv_scores_mse = -cross_val_score(regressor, X_ts, y_ts, cv=cv_method, scoring='neg_mean_squared_error')
        
        mean_r2 = np.mean(cv_scores_r2)
        mean_rmse = np.sqrt(np.mean(cv_scores_mse))
        
        results[name] = {
            'r2_scores': cv_scores_r2,
            'rmse_scores': np.sqrt(cv_scores_mse),
            'mean_r2': mean_r2,
            'mean_rmse': mean_rmse
        }
        
        print(f"{name}\t{mean_r2:.4f}\t\t{mean_rmse:.4f}")
        
        # 可视化交叉验证分割
        row = i + 1
        
        # 显示数据分割
        axes[row, 0].plot(time_index, y_ts, 'lightgray', linewidth=1, alpha=0.5, label='全部数据')
        
        colors = ['red', 'blue', 'green', 'orange', 'purple']
        
        for fold, (train_idx, val_idx) in enumerate(cv_method.split(X_ts)):
            if fold < 5:  # 只显示前5折
                axes[row, 0].scatter(time_index[train_idx], y_ts[train_idx], 
                                   c=colors[fold], alpha=0.3, s=10, label=f'训练集{fold+1}' if fold == 0 else "")
                axes[row, 0].scatter(time_index[val_idx], y_ts[val_idx], 
                                   c=colors[fold], alpha=0.8, s=20, marker='s', 
                                   label=f'验证集{fold+1}' if fold == 0 else "")
        
        axes[row, 0].set_title(f'{name} - 数据分割')
        axes[row, 0].set_xlabel('时间')
        axes[row, 0].set_ylabel('值')
        if row == 1:
            axes[row, 0].legend()
        axes[row, 0].grid(True, alpha=0.3)
        
        # 性能分布
        axes[row, 1].bar(range(1, len(cv_scores_r2)+1), cv_scores_r2, alpha=0.7, color='skyblue')
        axes[row, 1].axhline(y=mean_r2, color='red', linestyle='--', 
                           label=f'平均R²: {mean_r2:.3f}')
        axes[row, 1].set_title(f'{name} - R²得分')
        axes[row, 1].set_xlabel('折数')
        axes[row, 1].set_ylabel('R²得分')
        axes[row, 1].legend()
        axes[row, 1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    # 分析时间序列CV的重要性
    print(f"\n时间序列交叉验证的重要性:")
    ts_cv_r2 = results['时间序列CV']['mean_r2']
    kfold_cv_r2 = results['普通K折CV']['mean_r2']
    
    print(f"  时间序列CV R²: {ts_cv_r2:.4f}")
    print(f"  普通K折CV R²: {kfold_cv_r2:.4f}")
    print(f"  差异: {kfold_cv_r2 - ts_cv_r2:.4f}")
    
    if kfold_cv_r2 > ts_cv_r2:
        print(f"  普通K折CV给出了过于乐观的估计,可能存在数据泄露")
    else:
        print(f"  时间序列CV给出了更保守但更可靠的估计")
    
    return X_ts, y_ts, results

X_ts, y_ts, ts_results = time_series_cv_demo()

14.5 交叉验证在模型选择中的应用

14.5.1 模型比较

python
def model_comparison_with_cv():
    """使用交叉验证进行模型比较"""
    
    print("使用交叉验证进行模型比较:")
    print("=" * 30)
    
    # 加载乳腺癌数据集
    cancer = load_breast_cancer()
    X, y = cancer.data, cancer.target
    
    print(f"数据集: {X.shape}")
    print(f"类别分布: {np.bincount(y)}")
    
    # 不同的分类器
    classifiers = {
        '逻辑回归': LogisticRegression(random_state=42, max_iter=1000),
        '随机森林': RandomForestClassifier(n_estimators=100, random_state=42),
        'SVM': SVC(random_state=42, probability=True),
        'K近邻': KNeighborsClassifier(n_neighbors=5)
    }
    
    # 不同的评估指标
    scoring_metrics = ['accuracy', 'precision', 'recall', 'f1', 'roc_auc']
    
    # 交叉验证设置
    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    
    # 存储结果
    results = {}
    
    print(f"\n模型性能比较 (5折交叉验证):")
    print("模型\t\t准确率\t\t精确率\t\t召回率\t\tF1得分\t\tAUC")
    print("-" * 80)
    
    for name, clf in classifiers.items():
        # 创建管道(包含标准化)
        if name in ['逻辑回归', 'SVM', 'K近邻']:
            pipeline = Pipeline([
                ('scaler', StandardScaler()),
                ('classifier', clf)
            ])
        else:
            pipeline = clf
        
        # 计算多个指标的交叉验证得分
        cv_results = cross_validate(pipeline, X, y, cv=cv, scoring=scoring_metrics)
        
        # 计算平均值和标准差
        model_results = {}
        for metric in scoring_metrics:
            scores = cv_results[f'test_{metric}']
            model_results[metric] = {
                'mean': np.mean(scores),
                'std': np.std(scores),
                'scores': scores
            }
        
        results[name] = model_results
        
        # 打印结果
        print(f"{name}\t{model_results['accuracy']['mean']:.4f}±{model_results['accuracy']['std']:.3f}\t"
              f"{model_results['precision']['mean']:.4f}±{model_results['precision']['std']:.3f}\t"
              f"{model_results['recall']['mean']:.4f}±{model_results['recall']['std']:.3f}\t"
              f"{model_results['f1']['mean']:.4f}±{model_results['f1']['std']:.3f}\t"
              f"{model_results['roc_auc']['mean']:.4f}±{model_results['roc_auc']['std']:.3f}")
    
    # 可视化结果
    fig, axes = plt.subplots(2, 3, figsize=(18, 12))
    fig.suptitle('模型性能比较', fontsize=16)
    
    models = list(results.keys())
    
    # 各指标的平均性能
    for i, metric in enumerate(scoring_metrics):
        row = i // 3
        col = i % 3
        
        means = [results[model][metric]['mean'] for model in models]
        stds = [results[model][metric]['std'] for model in models]
        
        bars = axes[row, col].bar(models, means, yerr=stds, alpha=0.7, capsize=5)
        axes[row, col].set_title(f'{metric.upper()} 比较')
        axes[row, col].set_ylabel(metric.upper())
        axes[row, col].tick_params(axis='x', rotation=45)
        axes[row, col].grid(True, alpha=0.3)
        
        # 添加数值标签
        for bar, mean, std in zip(bars, means, stds):
            height = bar.get_height()
            axes[row, col].text(bar.get_x() + bar.get_width()/2., height + std + 0.01,
                               f'{mean:.3f}', ha='center', va='bottom', fontsize=9)
    
    # 综合性能雷达图
    axes[1, 2].remove()
    ax_radar = fig.add_subplot(2, 3, 6, projection='polar')
    
    # 准备雷达图数据
    angles = np.linspace(0, 2 * np.pi, len(scoring_metrics), endpoint=False).tolist()
    angles += angles[:1]  # 闭合图形
    
    colors = ['red', 'blue', 'green', 'orange']
    
    for i, model in enumerate(models):
        values = [results[model][metric]['mean'] for metric in scoring_metrics]
        values += values[:1]  # 闭合图形
        
        ax_radar.plot(angles, values, 'o-', linewidth=2, label=model, color=colors[i])
        ax_radar.fill(angles, values, alpha=0.1, color=colors[i])
    
    ax_radar.set_xticks(angles[:-1])
    ax_radar.set_xticklabels([metric.upper() for metric in scoring_metrics])
    ax_radar.set_ylim(0, 1)
    ax_radar.set_title('综合性能雷达图')
    ax_radar.legend(loc='upper right', bbox_to_anchor=(1.3, 1.0))
    
    plt.tight_layout()
    plt.show()
    
    # 统计显著性检验
    print(f"\n模型性能统计比较 (配对t检验):")
    from scipy import stats
    
    best_model = max(models, key=lambda x: results[x]['accuracy']['mean'])
    print(f"最佳模型: {best_model} (准确率: {results[best_model]['accuracy']['mean']:.4f})")
    
    print(f"\n与最佳模型的比较 (p值):")
    for model in models:
        if model != best_model:
            # 配对t检验
            best_scores = results[best_model]['accuracy']['scores']
            model_scores = results[model]['accuracy']['scores']
            
            t_stat, p_value = stats.ttest_rel(best_scores, model_scores)
            
            significance = "***" if p_value < 0.001 else "**" if p_value < 0.01 else "*" if p_value < 0.05 else ""
            print(f"{model} vs {best_model}: p = {p_value:.4f} {significance}")
    
    return results

model_comparison_results = model_comparison_with_cv()

14.5.2 超参数调优中的交叉验证

python
def hyperparameter_tuning_with_cv():
    """使用交叉验证进行超参数调优"""
    
    print("超参数调优中的交叉验证:")
    print("=" * 30)
    
    # 使用葡萄酒数据集
    wine = load_wine()
    X, y = wine.data, wine.target
    
    # 标准化数据
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    
    print(f"数据集: {X_scaled.shape}")
    
    # SVM超参数调优
    svm_param_grid = {
        'C': [0.1, 1, 10, 100],
        'gamma': ['scale', 'auto', 0.001, 0.01, 0.1, 1],
        'kernel': ['rbf', 'poly', 'sigmoid']
    }
    
    # 随机森林超参数调优
    rf_param_grid = {
        'n_estimators': [50, 100, 200],
        'max_depth': [None, 10, 20, 30],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4]
    }
    
    # 网格搜索设置
    cv_inner = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)  # 内层CV
    cv_outer = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)  # 外层CV
    
    models = {
        'SVM': (SVC(random_state=42), svm_param_grid),
        '随机森林': (RandomForestClassifier(random_state=42), rf_param_grid)
    }
    
    results = {}
    
    print(f"\n嵌套交叉验证结果:")
    print("模型\t\t最佳参数\t\t\t\t外层CV得分")
    print("-" * 80)
    
    for name, (model, param_grid) in models.items():
        # 网格搜索
        grid_search = GridSearchCV(
            model, param_grid, 
            cv=cv_inner, 
            scoring='accuracy',
            n_jobs=-1
        )
        
        # 外层交叉验证
        outer_scores = cross_val_score(grid_search, X_scaled, y, cv=cv_outer, scoring='accuracy')
        
        # 在全数据上找最佳参数
        grid_search.fit(X_scaled, y)
        best_params = grid_search.best_params_
        best_score = grid_search.best_score_
        
        results[name] = {
            'best_params': best_params,
            'inner_cv_score': best_score,
            'outer_cv_scores': outer_scores,
            'outer_cv_mean': np.mean(outer_scores),
            'outer_cv_std': np.std(outer_scores)
        }
        
        print(f"{name}\t{str(best_params)[:40]:<40}\t{np.mean(outer_scores):.4f}±{np.std(outer_scores):.3f}")
    
    # 可视化超参数调优过程
    fig, axes = plt.subplots(2, 2, figsize=(15, 12))
    fig.suptitle('超参数调优分析', fontsize=16)
    
    # SVM参数分析
    svm_grid = GridSearchCV(SVC(random_state=42), svm_param_grid, cv=cv_inner, scoring='accuracy')
    svm_grid.fit(X_scaled, y)
    
    # 提取RBF核的结果进行可视化
    rbf_results = []
    for params, score in zip(svm_grid.cv_results_['params'], svm_grid.cv_results_['mean_test_score']):
        if params['kernel'] == 'rbf':
            rbf_results.append((params['C'], params['gamma'], score))
    
    if rbf_results:
        rbf_df = pd.DataFrame(rbf_results, columns=['C', 'gamma', 'score'])
        
        # 创建热力图数据
        C_values = sorted(rbf_df['C'].unique())
        gamma_values = sorted([g for g in rbf_df['gamma'].unique() if isinstance(g, float)])
        
        if gamma_values:
            heatmap_data = np.zeros((len(gamma_values), len(C_values)))
            
            for i, gamma in enumerate(gamma_values):
                for j, C in enumerate(C_values):
                    subset = rbf_df[(rbf_df['C'] == C) & (rbf_df['gamma'] == gamma)]
                    if not subset.empty:
                        heatmap_data[i, j] = subset['score'].iloc[0]
            
            im = axes[0, 0].imshow(heatmap_data, cmap='viridis', aspect='auto')
            axes[0, 0].set_title('SVM (RBF核) 参数热力图')
            axes[0, 0].set_xlabel('C参数')
            axes[0, 0].set_ylabel('Gamma参数')
            axes[0, 0].set_xticks(range(len(C_values)))
            axes[0, 0].set_yticks(range(len(gamma_values)))
            axes[0, 0].set_xticklabels(C_values)
            axes[0, 0].set_yticklabels([f'{g:.3f}' for g in gamma_values])
            plt.colorbar(im, ax=axes[0, 0], label='CV得分')
    
    # 随机森林参数分析
    rf_grid = GridSearchCV(RandomForestClassifier(random_state=42), rf_param_grid, cv=cv_inner, scoring='accuracy')
    rf_grid.fit(X_scaled, y)
    
    # n_estimators vs max_depth
    rf_results = []
    for params, score in zip(rf_grid.cv_results_['params'], rf_grid.cv_results_['mean_test_score']):
        rf_results.append((params['n_estimators'], params['max_depth'], score))
    
    rf_df = pd.DataFrame(rf_results, columns=['n_estimators', 'max_depth', 'score'])
    
    # 按n_estimators分组显示
    for n_est in [50, 100, 200]:
        subset = rf_df[rf_df['n_estimators'] == n_est]
        axes[0, 1].scatter(subset['max_depth'], subset['score'], 
                          label=f'n_estimators={n_est}', alpha=0.7, s=50)
    
    axes[0, 1].set_title('随机森林参数分析')
    axes[0, 1].set_xlabel('max_depth')
    axes[0, 1].set_ylabel('CV得分')
    axes[0, 1].legend()
    axes[0, 1].grid(True, alpha=0.3)
    
    # 外层CV得分比较
    models_list = list(results.keys())
    outer_means = [results[model]['outer_cv_mean'] for model in models_list]
    outer_stds = [results[model]['outer_cv_std'] for model in models_list]
    
    bars = axes[1, 0].bar(models_list, outer_means, yerr=outer_stds, alpha=0.7, capsize=5)
    axes[1, 0].set_title('嵌套交叉验证结果')
    axes[1, 0].set_ylabel('准确率')
    axes[1, 0].grid(True, alpha=0.3)
    
    # 添加数值标签
    for bar, mean, std in zip(bars, outer_means, outer_stds):
        height = bar.get_height()
        axes[1, 0].text(bar.get_x() + bar.get_width()/2., height + std + 0.01,
                       f'{mean:.3f}', ha='center', va='bottom')
    
    # 内层vs外层CV得分比较
    inner_scores = [results[model]['inner_cv_score'] for model in models_list]
    
    x = np.arange(len(models_list))
    width = 0.35
    
    axes[1, 1].bar(x - width/2, inner_scores, width, label='内层CV', alpha=0.7)
    axes[1, 1].bar(x + width/2, outer_means, width, label='外层CV', alpha=0.7)
    axes[1, 1].set_title('内层 vs 外层 CV得分')
    axes[1, 1].set_ylabel('准确率')
    axes[1, 1].set_xticks(x)
    axes[1, 1].set_xticklabels(models_list)
    axes[1, 1].legend()
    axes[1, 1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    # 分析过拟合情况
    print(f"\n过拟合分析:")
    for name in models_list:
        inner_score = results[name]['inner_cv_score']
        outer_score = results[name]['outer_cv_mean']
        overfitting = inner_score - outer_score
        
        print(f"{name}:")
        print(f"  内层CV得分: {inner_score:.4f}")
        print(f"  外层CV得分: {outer_score:.4f}")
        print(f"  过拟合程度: {overfitting:.4f}")
        
        if overfitting > 0.02:
            print(f"  ⚠️  可能存在过拟合")
        else:
            print(f"  ✅  过拟合风险较低")
    
    return results

hyperparameter_results = hyperparameter_tuning_with_cv()
```## 14.6 学
习曲线和验证曲线

### 14.6.1 学习曲线分析

```python
def learning_curve_analysis():
    """学习曲线分析"""
    
    print("学习曲线分析:")
    print("分析训练集大小对模型性能的影响")
    
    # 使用乳腺癌数据集
    cancer = load_breast_cancer()
    X, y = cancer.data, cancer.target
    
    # 标准化
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    
    # 不同复杂度的模型
    models = {
        '简单模型': LogisticRegression(C=0.1, random_state=42, max_iter=1000),
        '复杂模型': RandomForestClassifier(n_estimators=200, max_depth=None, random_state=42),
        '适中模型': RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42)
    }
    
    fig, axes = plt.subplots(1, 3, figsize=(18, 6))
    fig.suptitle('不同模型的学习曲线', fontsize=16)
    
    for i, (name, model) in enumerate(models.items()):
        # 计算学习曲线
        train_sizes, train_scores, val_scores = learning_curve(
            model, X_scaled, y,
            cv=5,
            n_jobs=-1,
            train_sizes=np.linspace(0.1, 1.0, 10),
            scoring='accuracy',
            random_state=42
        )
        
        # 计算均值和标准差
        train_mean = np.mean(train_scores, axis=1)
        train_std = np.std(train_scores, axis=1)
        val_mean = np.mean(val_scores, axis=1)
        val_std = np.std(val_scores, axis=1)
        
        # 绘制学习曲线
        axes[i].plot(train_sizes, train_mean, 'o-', color='blue', label='训练得分')
        axes[i].fill_between(train_sizes, train_mean - train_std, train_mean + train_std, 
                           alpha=0.1, color='blue')
        
        axes[i].plot(train_sizes, val_mean, 'o-', color='red', label='验证得分')
        axes[i].fill_between(train_sizes, val_mean - val_std, val_mean + val_std, 
                           alpha=0.1, color='red')
        
        axes[i].set_title(name)
        axes[i].set_xlabel('训练样本数')
        axes[i].set_ylabel('准确率')
        axes[i].legend()
        axes[i].grid(True, alpha=0.3)
        
        # 分析学习曲线特征
        final_train_score = train_mean[-1]
        final_val_score = val_mean[-1]
        gap = final_train_score - final_val_score
        
        print(f"\n{name}学习曲线分析:")
        print(f"  最终训练得分: {final_train_score:.4f}")
        print(f"  最终验证得分: {final_val_score:.4f}")
        print(f"  训练-验证差距: {gap:.4f}")
        
        if gap > 0.05:
            print(f"  诊断: 可能过拟合,考虑增加数据或降低模型复杂度")
        elif final_val_score < 0.85:
            print(f"  诊断: 可能欠拟合,考虑增加模型复杂度")
        else:
            print(f"  诊断: 模型拟合良好")
    
    plt.tight_layout()
    plt.show()

learning_curve_analysis()

14.6.2 验证曲线分析

python
def validation_curve_analysis():
    """验证曲线分析"""
    
    print("验证曲线分析:")
    print("分析单个超参数对模型性能的影响")
    
    # 使用葡萄酒数据集
    wine = load_wine()
    X, y = wine.data, wine.target
    
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    
    # 分析不同算法的关键参数
    param_analyses = [
        {
            'model': RandomForestClassifier(random_state=42),
            'param_name': 'n_estimators',
            'param_range': [10, 50, 100, 200, 300, 500],
            'title': '随机森林 - 树的数量'
        },
        {
            'model': SVC(random_state=42, kernel='rbf', gamma='scale'),
            'param_name': 'C',
            'param_range': np.logspace(-3, 3, 7),
            'title': 'SVM - 正则化参数C'
        },
        {
            'model': KNeighborsClassifier(),
            'param_name': 'n_neighbors',
            'param_range': range(1, 21),
            'title': 'K近邻 - 邻居数量K'
        }
    ]
    
    fig, axes = plt.subplots(1, 3, figsize=(18, 6))
    fig.suptitle('验证曲线分析', fontsize=16)
    
    for i, analysis in enumerate(param_analyses):
        model = analysis['model']
        param_name = analysis['param_name']
        param_range = analysis['param_range']
        title = analysis['title']
        
        # 对SVM使用标准化数据,对随机森林使用原始数据
        if 'SVM' in title or 'K近邻' in title:
            X_input = X_scaled
        else:
            X_input = X
        
        # 计算验证曲线
        train_scores, val_scores = validation_curve(
            model, X_input, y,
            param_name=param_name,
            param_range=param_range,
            cv=5,
            scoring='accuracy',
            n_jobs=-1
        )
        
        # 计算均值和标准差
        train_mean = np.mean(train_scores, axis=1)
        train_std = np.std(train_scores, axis=1)
        val_mean = np.mean(val_scores, axis=1)
        val_std = np.std(val_scores, axis=1)
        
        # 绘制验证曲线
        if param_name == 'C':
            axes[i].semilogx(param_range, train_mean, 'o-', color='blue', label='训练得分')
            axes[i].fill_between(param_range, train_mean - train_std, train_mean + train_std, 
                               alpha=0.1, color='blue')
            axes[i].semilogx(param_range, val_mean, 'o-', color='red', label='验证得分')
            axes[i].fill_between(param_range, val_mean - val_std, val_mean + val_std, 
                               alpha=0.1, color='red')
        else:
            axes[i].plot(param_range, train_mean, 'o-', color='blue', label='训练得分')
            axes[i].fill_between(param_range, train_mean - train_std, train_mean + train_std, 
                               alpha=0.1, color='blue')
            axes[i].plot(param_range, val_mean, 'o-', color='red', label='验证得分')
            axes[i].fill_between(param_range, val_mean - val_std, val_mean + val_std, 
                               alpha=0.1, color='red')
        
        axes[i].set_title(title)
        axes[i].set_xlabel(param_name)
        axes[i].set_ylabel('准确率')
        axes[i].legend()
        axes[i].grid(True, alpha=0.3)
        
        # 找出最佳参数
        best_idx = np.argmax(val_mean)
        best_param = param_range[best_idx]
        best_score = val_mean[best_idx]
        
        print(f"\n{title}:")
        print(f"  最佳{param_name}: {best_param}")
        print(f"  最佳验证得分: {best_score:.4f}")
        print(f"  对应训练得分: {train_mean[best_idx]:.4f}")
        print(f"  过拟合程度: {train_mean[best_idx] - best_score:.4f}")
    
    plt.tight_layout()
    plt.show()

validation_curve_analysis()

14.7 交叉验证的最佳实践

14.7.1 避免数据泄露

python
def avoid_data_leakage_demo():
    """演示如何避免交叉验证中的数据泄露"""
    
    print("避免交叉验证中的数据泄露:")
    print("=" * 30)
    
    # 创建包含缺失值的数据
    X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
    
    # 人为引入缺失值
    missing_mask = np.random.random(X.shape) < 0.1
    X_with_missing = X.copy()
    X_with_missing[missing_mask] = np.nan
    
    print(f"数据集大小: {X_with_missing.shape}")
    print(f"缺失值比例: {np.isnan(X_with_missing).sum() / X_with_missing.size * 100:.1f}%")
    
    from sklearn.impute import SimpleImputer
    
    # 错误做法:在交叉验证前进行预处理
    print(f"\n❌ 错误做法:")
    
    # 在全数据上拟合填充器
    imputer_wrong = SimpleImputer(strategy='mean')
    X_imputed_wrong = imputer_wrong.fit_transform(X_with_missing)
    
    # 然后进行交叉验证
    clf_wrong = LogisticRegression(random_state=42, max_iter=1000)
    cv_scores_wrong = cross_val_score(clf_wrong, X_imputed_wrong, y, cv=5)
    
    print(f"错误方法CV得分: {np.mean(cv_scores_wrong):.4f} ± {np.std(cv_scores_wrong):.4f}")
    print(f"问题: 使用了测试集信息来填充训练集的缺失值")
    
    # 正确做法:在交叉验证内部进行预处理
    print(f"\n✅ 正确做法:")
    
    # 创建包含预处理的管道
    pipeline_correct = Pipeline([
        ('imputer', SimpleImputer(strategy='mean')),
        ('scaler', StandardScaler()),
        ('classifier', LogisticRegression(random_state=42, max_iter=1000))
    ])
    
    cv_scores_correct = cross_val_score(pipeline_correct, X_with_missing, y, cv=5)
    
    print(f"正确方法CV得分: {np.mean(cv_scores_correct):.4f} ± {np.std(cv_scores_correct):.4f}")
    print(f"优势: 每折的预处理都只使用训练数据")
    
    # 可视化比较
    fig, axes = plt.subplots(1, 3, figsize=(18, 6))
    fig.suptitle('避免数据泄露的重要性', fontsize=16)
    
    # 缺失值模式
    axes[0].imshow(np.isnan(X_with_missing[:100, :10]), cmap='RdYlBu', aspect='auto')
    axes[0].set_title('缺失值模式(前100样本,前10特征)')
    axes[0].set_xlabel('特征')
    axes[0].set_ylabel('样本')
    
    # CV得分比较
    methods = ['错误方法', '正确方法']
    means = [np.mean(cv_scores_wrong), np.mean(cv_scores_correct)]
    stds = [np.std(cv_scores_wrong), np.std(cv_scores_correct)]
    
    bars = axes[1].bar(methods, means, yerr=stds, alpha=0.7, 
                      color=['red', 'green'], capsize=5)
    axes[1].set_title('交叉验证得分比较')
    axes[1].set_ylabel('准确率')
    axes[1].grid(True, alpha=0.3)
    
    # 添加数值标签
    for bar, mean, std in zip(bars, means, stds):
        height = bar.get_height()
        axes[1].text(bar.get_x() + bar.get_width()/2., height + std + 0.005,
                    f'{mean:.4f}', ha='center', va='bottom')
    
    # 各折得分分布
    axes[2].boxplot([cv_scores_wrong, cv_scores_correct], labels=methods)
    axes[2].set_title('各折得分分布')
    axes[2].set_ylabel('准确率')
    axes[2].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    # 详细分析
    print(f"\n详细分析:")
    print(f"错误方法各折得分: {cv_scores_wrong}")
    print(f"正确方法各折得分: {cv_scores_correct}")
    
    # 计算差异的统计显著性
    from scipy import stats
    t_stat, p_value = stats.ttest_rel(cv_scores_wrong, cv_scores_correct)
    print(f"\n配对t检验:")
    print(f"  t统计量: {t_stat:.4f}")
    print(f"  p值: {p_value:.4f}")
    
    if p_value < 0.05:
        print(f"  结论: 两种方法的差异具有统计显著性")
    else:
        print(f"  结论: 两种方法的差异不具有统计显著性")
    
    return cv_scores_wrong, cv_scores_correct

cv_scores_wrong, cv_scores_correct = avoid_data_leakage_demo()

14.7.2 交叉验证策略选择

python
def cv_strategy_selection_guide():
    """交叉验证策略选择指南"""
    
    print("交叉验证策略选择指南:")
    print("=" * 25)
    
    strategies = {
        "K折交叉验证": {
            "适用场景": ["一般分类/回归问题", "数据集中等大小", "类别平衡"],
            "推荐K值": "5或10",
            "优点": ["计算效率高", "结果稳定", "广泛使用"],
            "缺点": ["可能不保持类别比例"],
            "代码": "KFold(n_splits=5, shuffle=True, random_state=42)"
        },
        
        "分层K折交叉验证": {
            "适用场景": ["分类问题", "类别不平衡", "小数据集"],
            "推荐K值": "5或10",
            "优点": ["保持类别比例", "更可靠的估计"],
            "缺点": ["只适用于分类问题"],
            "代码": "StratifiedKFold(n_splits=5, shuffle=True, random_state=42)"
        },
        
        "留一交叉验证": {
            "适用场景": ["小数据集(<100样本)", "需要最大化训练数据"],
            "推荐K值": "N/A",
            "优点": ["充分利用数据", "无随机性"],
            "缺点": ["计算成本高", "方差大"],
            "代码": "LeaveOneOut()"
        },
        
        "随机分割": {
            "适用场景": ["大数据集", "需要控制验证集大小", "快速评估"],
            "推荐K值": "10-100次分割",
            "优点": ["灵活控制比例", "适合大数据"],
            "缺点": ["可能重复采样", "结果方差较大"],
            "代码": "ShuffleSplit(n_splits=10, test_size=0.2, random_state=42)"
        },
        
        "时间序列CV": {
            "适用场景": ["时间序列数据", "有时间顺序的数据"],
            "推荐K值": "5-10",
            "优点": ["避免时间泄露", "符合实际应用"],
            "缺点": ["训练数据逐渐增加", "计算复杂"],
            "代码": "TimeSeriesSplit(n_splits=5)"
        },
        
        "组交叉验证": {
            "适用场景": ["数据有分组结构", "避免组内泄露"],
            "推荐K值": "等于组数",
            "优点": ["避免组内泄露", "更真实的评估"],
            "缺点": ["需要组信息", "可能不平衡"],
            "代码": "GroupKFold(n_splits=n_groups)"
        }
    }
    
    for strategy, info in strategies.items():
        print(f"\n{strategy}:")
        print("-" * len(strategy))
        for key, value in info.items():
            if isinstance(value, list):
                print(f"  {key}: {', '.join(value)}")
            else:
                print(f"  {key}: {value}")
    
    # 决策流程图
    print(f"\n交叉验证策略选择流程:")
    print("-" * 25)
    
    decision_flow = """
    数据类型?
    ├─ 时间序列 → 时间序列CV
    ├─ 有分组结构 → 组CV
    └─ 一般数据 ──┐

    任务类型?    │
    ├─ 分类 ──┐   │
    │         │   │
    │  类别平衡? │
    │  ├─ 是 → K折CV
    │  └─ 否 → 分层K折CV
    │             │
    └─ 回归 → K折CV
    
    数据集大小?
    ├─ 小(<100) → 留一CV
    ├─ 中(100-10000) → K折CV (K=5或10)
    └─ 大(>10000) → 随机分割CV
    
    计算资源?
    ├─ 有限 → 随机分割CV (少次数)
    └─ 充足 → K折CV 或 留一CV
    """
    
    print(decision_flow)
    
    # 实际选择示例
    print(f"\n实际选择示例:")
    print("-" * 15)
    
    examples = [
        {
            "场景": "图像分类,10000样本,10类,类别平衡",
            "推荐": "5折交叉验证",
            "原因": "数据量适中,类别平衡,计算效率高"
        },
        {
            "场景": "医疗诊断,500样本,2类,严重不平衡(5%阳性)",
            "推荐": "分层5折交叉验证",
            "原因": "小数据集,类别不平衡,需要保持比例"
        },
        {
            "场景": "股票价格预测,时间序列数据",
            "推荐": "时间序列交叉验证",
            "原因": "时间序列数据,避免使用未来信息"
        },
        {
            "场景": "药物试验,患者分组数据",
            "推荐": "组交叉验证",
            "原因": "避免同一患者的数据同时出现在训练和验证集"
        }
    ]
    
    for i, example in enumerate(examples, 1):
        print(f"\n示例{i}: {example['场景']}")
        print(f"  推荐策略: {example['推荐']}")
        print(f"  选择原因: {example['原因']}")

cv_strategy_selection_guide()

14.8 练习题

练习1:基础交叉验证

  1. 使用鸢尾花数据集,比较3折、5折、10折交叉验证的结果
  2. 分析不同K值对性能估计稳定性的影响
  3. 计算每种方法的置信区间

练习2:分层交叉验证

  1. 创建一个严重不平衡的分类数据集
  2. 比较普通K折和分层K折交叉验证的结果
  3. 分析类别不平衡对交叉验证的影响

练习3:学习曲线分析

  1. 使用不同复杂度的模型(如不同深度的决策树)
  2. 绘制学习曲线并分析过拟合/欠拟合情况
  3. 确定最佳的模型复杂度

练习4:验证曲线

  1. 选择一个算法(如随机森林或SVM)
  2. 分析其关键超参数对性能的影响
  3. 使用验证曲线找出最佳参数值

练习5:嵌套交叉验证

  1. 实现完整的嵌套交叉验证流程
  2. 比较多个算法和参数组合
  3. 分析内层和外层CV得分的差异

14.9 小结

在本章中,我们深入学习了交叉验证的各个方面:

核心概念

  • 交叉验证原理:通过多次数据分割获得可靠的性能估计
  • 避免过拟合:防止过于乐观的性能评估
  • 模型选择:基于交叉验证结果选择最佳模型

主要方法

  • K折交叉验证:最常用的交叉验证方法
  • 分层交叉验证:保持类别比例的分类专用方法
  • 留一交叉验证:适用于小数据集的极端方法
  • 时间序列交叉验证:考虑时间顺序的特殊方法

实践技能

  • 方法选择:根据数据特点选择合适的交叉验证策略
  • 避免数据泄露:正确的预处理和验证流程
  • 性能分析:学习曲线和验证曲线的解读
  • 统计检验:评估模型差异的显著性

关键要点

  • 交叉验证是评估模型泛化能力的标准方法
  • 不同的数据类型需要不同的交叉验证策略
  • 预处理必须在交叉验证内部进行
  • 嵌套交叉验证用于无偏的模型选择

最佳实践

  1. 选择合适的策略

    • 分类问题优先考虑分层交叉验证
    • 时间序列数据必须使用时间序列交叉验证
    • 小数据集可以考虑留一交叉验证
  2. 避免数据泄露

    • 预处理步骤放在交叉验证内部
    • 使用Pipeline确保正确的流程
    • 特征选择也要在CV内部进行
  3. 结果解释

    • 报告均值和标准差
    • 使用置信区间
    • 进行统计显著性检验
  4. 计算效率

    • 根据数据集大小选择合适的折数
    • 使用并行计算加速
    • 考虑计算资源限制

14.10 下一步

现在你已经掌握了交叉验证这个重要的模型评估技术!在下一章超参数调优中,我们将学习如何系统地优化机器学习模型的超参数,进一步提升模型性能。


章节要点回顾

  • ✅ 理解了交叉验证的基本原理和重要性
  • ✅ 掌握了各种交叉验证方法的使用场景
  • ✅ 学会了避免数据泄露的正确做法
  • ✅ 了解了学习曲线和验证曲线的分析方法
  • ✅ 掌握了模型比较和选择的科学方法
  • ✅ 能够根据数据特点选择合适的交叉验证策略

本站内容仅供学习和研究使用。