Skip to content

第13章:异常检测

异常检测是识别数据中不符合预期模式的观测值的过程。这些异常点可能表示错误、欺诈、系统故障或其他重要事件。本章将详细介绍各种异常检测算法的原理、实现和应用。

13.1 什么是异常检测?

异常检测(Anomaly Detection),也称为离群点检测(Outlier Detection),是识别与大多数数据显著不同的数据点的过程。

13.1.1 异常的类型

  • 点异常:单个数据点异常
  • 上下文异常:在特定上下文中异常
  • 集体异常:一组数据点共同表现异常

13.1.2 异常检测的应用

  • 欺诈检测:信用卡交易、保险理赔
  • 网络安全:入侵检测、恶意软件识别
  • 质量控制:制造业缺陷检测
  • 医疗诊断:疾病筛查、异常生理指标
  • 系统监控:服务器性能、网络流量

13.1.3 异常检测的挑战

  • 标签稀缺:异常样本通常很少
  • 类别不平衡:正常样本远多于异常样本
  • 概念漂移:异常模式可能随时间变化
  • 高维数据:维度诅咒影响检测效果

13.2 准备环境和数据

python
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.datasets import make_blobs, make_classification
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
from sklearn.covariance import EllipticEnvelope
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, roc_curve
from sklearn.model_selection import train_test_split
import warnings
warnings.filterwarnings('ignore')

# 设置随机种子
np.random.seed(42)

# 设置图形样式
plt.style.use('seaborn-v0_8')
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

13.3 统计方法

13.3.1 基于距离的异常检测

python
def statistical_anomaly_detection():
    """演示基于统计的异常检测方法"""
    
    print("基于统计的异常检测方法:")
    print("=" * 30)
    
    # 创建正常数据
    np.random.seed(42)
    normal_data = np.random.multivariate_normal([0, 0], [[1, 0.5], [0.5, 1]], 200)
    
    # 添加异常点
    outliers = np.array([[4, 4], [-4, -4], [4, -4], [-4, 4], [0, 5], [5, 0]])
    
    # 合并数据
    X_combined = np.vstack([normal_data, outliers])
    y_true = np.hstack([np.zeros(len(normal_data)), np.ones(len(outliers))])
    
    print(f"数据集大小: {len(X_combined)}")
    print(f"正常样本: {len(normal_data)}")
    print(f"异常样本: {len(outliers)}")
    
    # 方法1: Z-score方法
    def zscore_anomaly_detection(X, threshold=2.5):
        """基于Z-score的异常检测"""
        mean = np.mean(X, axis=0)
        std = np.std(X, axis=0)
        z_scores = np.abs((X - mean) / std)
        # 如果任一维度的Z-score超过阈值,则认为是异常
        anomalies = np.any(z_scores > threshold, axis=1)
        return anomalies.astype(int)
    
    # 方法2: 马氏距离方法
    def mahalanobis_anomaly_detection(X, threshold=2.5):
        """基于马氏距离的异常检测"""
        mean = np.mean(X, axis=0)
        cov = np.cov(X.T)
        
        # 计算马氏距离
        diff = X - mean
        mahal_dist = np.sqrt(np.sum(diff @ np.linalg.inv(cov) * diff, axis=1))
        
        # 基于阈值判断异常
        anomalies = mahal_dist > threshold
        return anomalies.astype(int)
    
    # 方法3: 四分位距(IQR)方法
    def iqr_anomaly_detection(X, factor=1.5):
        """基于IQR的异常检测"""
        Q1 = np.percentile(X, 25, axis=0)
        Q3 = np.percentile(X, 75, axis=0)
        IQR = Q3 - Q1
        
        lower_bound = Q1 - factor * IQR
        upper_bound = Q3 + factor * IQR
        
        # 如果任一维度超出边界,则认为是异常
        anomalies = np.any((X < lower_bound) | (X > upper_bound), axis=1)
        return anomalies.astype(int)
    
    # 应用不同方法
    methods = {
        'Z-score': zscore_anomaly_detection(X_combined),
        '马氏距离': mahalanobis_anomaly_detection(X_combined),
        'IQR': iqr_anomaly_detection(X_combined)
    }
    
    # 可视化结果
    fig, axes = plt.subplots(2, 2, figsize=(15, 12))
    fig.suptitle('基于统计的异常检测方法', fontsize=16)
    
    # 原始数据
    axes[0, 0].scatter(normal_data[:, 0], normal_data[:, 1], 
                      c='blue', alpha=0.6, label='正常点', s=30)
    axes[0, 0].scatter(outliers[:, 0], outliers[:, 1], 
                      c='red', alpha=0.8, label='真实异常点', s=100, marker='x')
    axes[0, 0].set_title('原始数据')
    axes[0, 0].set_xlabel('特征 1')
    axes[0, 0].set_ylabel('特征 2')
    axes[0, 0].legend()
    axes[0, 0].grid(True, alpha=0.3)
    
    # 不同方法的结果
    method_names = list(methods.keys())
    for i, (method_name, predictions) in enumerate(methods.items()):
        row = (i + 1) // 2
        col = (i + 1) % 2
        
        # 计算性能指标
        from sklearn.metrics import precision_score, recall_score, f1_score
        precision = precision_score(y_true, predictions)
        recall = recall_score(y_true, predictions)
        f1 = f1_score(y_true, predictions)
        
        print(f"\n{method_name}方法:")
        print(f"  精确率: {precision:.3f}")
        print(f"  召回率: {recall:.3f}")
        print(f"  F1得分: {f1:.3f}")
        
        # 可视化
        normal_mask = predictions == 0
        anomaly_mask = predictions == 1
        
        axes[row, col].scatter(X_combined[normal_mask, 0], X_combined[normal_mask, 1], 
                              c='blue', alpha=0.6, label='预测正常', s=30)
        axes[row, col].scatter(X_combined[anomaly_mask, 0], X_combined[anomaly_mask, 1], 
                              c='red', alpha=0.8, label='预测异常', s=60, marker='s')
        
        axes[row, col].set_title(f'{method_name} (F1={f1:.3f})')
        axes[row, col].set_xlabel('特征 1')
        axes[row, col].set_ylabel('特征 2')
        axes[row, col].legend()
        axes[row, col].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    return X_combined, y_true, methods

X_combined, y_true, statistical_methods = statistical_anomaly_detection()
```##
# 13.3.2 椭圆包络法

```python
def elliptic_envelope_demo():
    """演示椭圆包络异常检测"""
    
    print("椭圆包络异常检测:")
    print("假设正常数据服从多元高斯分布")
    
    # 使用之前的数据
    # 椭圆包络法
    elliptic_env = EllipticEnvelope(contamination=0.1, random_state=42)
    y_pred_elliptic = elliptic_env.fit_predict(X_combined)
    
    # 转换预测结果 (-1 -> 1, 1 -> 0)
    y_pred_elliptic_binary = (y_pred_elliptic == -1).astype(int)
    
    # 获取异常分数
    anomaly_scores = elliptic_env.decision_function(X_combined)
    
    # 计算性能指标
    from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
    
    precision = precision_score(y_true, y_pred_elliptic_binary)
    recall = recall_score(y_true, y_pred_elliptic_binary)
    f1 = f1_score(y_true, y_pred_elliptic_binary)
    accuracy = accuracy_score(y_true, y_pred_elliptic_binary)
    
    print(f"椭圆包络法性能:")
    print(f"  准确率: {accuracy:.3f}")
    print(f"  精确率: {precision:.3f}")
    print(f"  召回率: {recall:.3f}")
    print(f"  F1得分: {f1:.3f}")
    
    # 可视化
    fig, axes = plt.subplots(1, 3, figsize=(18, 6))
    
    # 原始数据和预测结果
    normal_mask = y_pred_elliptic_binary == 0
    anomaly_mask = y_pred_elliptic_binary == 1
    
    axes[0].scatter(X_combined[normal_mask, 0], X_combined[normal_mask, 1], 
                   c='blue', alpha=0.6, label='预测正常', s=30)
    axes[0].scatter(X_combined[anomaly_mask, 0], X_combined[anomaly_mask, 1], 
                   c='red', alpha=0.8, label='预测异常', s=60, marker='s')
    
    # 绘制椭圆边界
    xx, yy = np.meshgrid(np.linspace(-6, 6, 100), np.linspace(-6, 6, 100))
    grid_points = np.c_[xx.ravel(), yy.ravel()]
    Z = elliptic_env.decision_function(grid_points)
    Z = Z.reshape(xx.shape)
    
    axes[0].contour(xx, yy, Z, levels=[0], colors='black', linestyles='--', linewidths=2)
    axes[0].set_title('椭圆包络异常检测')
    axes[0].set_xlabel('特征 1')
    axes[0].set_ylabel('特征 2')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)
    
    # 异常分数分布
    axes[1].hist(anomaly_scores[y_true == 0], bins=20, alpha=0.6, 
                label='正常样本', color='blue', density=True)
    axes[1].hist(anomaly_scores[y_true == 1], bins=20, alpha=0.6, 
                label='异常样本', color='red', density=True)
    axes[1].axvline(x=0, color='black', linestyle='--', alpha=0.7, label='决策边界')
    axes[1].set_title('异常分数分布')
    axes[1].set_xlabel('异常分数')
    axes[1].set_ylabel('密度')
    axes[1].legend()
    axes[1].grid(True, alpha=0.3)
    
    # ROC曲线
    # 需要将异常分数转换为概率(分数越低越异常)
    anomaly_probs = -anomaly_scores  # 转换符号
    fpr, tpr, _ = roc_curve(y_true, anomaly_probs)
    auc_score = roc_auc_score(y_true, anomaly_probs)
    
    axes[2].plot(fpr, tpr, linewidth=2, label=f'ROC曲线 (AUC = {auc_score:.3f})')
    axes[2].plot([0, 1], [0, 1], 'k--', alpha=0.7, label='随机分类器')
    axes[2].set_title('ROC曲线')
    axes[2].set_xlabel('假正例率')
    axes[2].set_ylabel('真正例率')
    axes[2].legend()
    axes[2].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    return elliptic_env, anomaly_scores

elliptic_env, anomaly_scores = elliptic_envelope_demo()

13.4 基于机器学习的方法

13.4.1 孤立森林

python
def isolation_forest_demo():
    """演示孤立森林异常检测"""
    
    print("孤立森林异常检测:")
    print("基于随机分割的思想,异常点更容易被孤立")
    
    # 创建更复杂的数据集
    np.random.seed(42)
    
    # 正常数据:两个聚类
    cluster1 = np.random.multivariate_normal([2, 2], [[0.5, 0.1], [0.1, 0.5]], 100)
    cluster2 = np.random.multivariate_normal([-2, -2], [[0.5, -0.1], [-0.1, 0.5]], 100)
    normal_data = np.vstack([cluster1, cluster2])
    
    # 异常数据:随机分布
    outliers = np.random.uniform(-5, 5, (20, 2))
    
    # 合并数据
    X_iso = np.vstack([normal_data, outliers])
    y_true_iso = np.hstack([np.zeros(len(normal_data)), np.ones(len(outliers))])
    
    print(f"数据集大小: {len(X_iso)}")
    print(f"正常样本: {len(normal_data)}")
    print(f"异常样本: {len(outliers)}")
    
    # 孤立森林
    iso_forest = IsolationForest(
        contamination=0.1,  # 预期异常比例
        random_state=42,
        n_estimators=100
    )
    
    y_pred_iso = iso_forest.fit_predict(X_iso)
    y_pred_iso_binary = (y_pred_iso == -1).astype(int)
    
    # 获取异常分数
    anomaly_scores_iso = iso_forest.decision_function(X_iso)
    
    # 性能评估
    precision = precision_score(y_true_iso, y_pred_iso_binary)
    recall = recall_score(y_true_iso, y_pred_iso_binary)
    f1 = f1_score(y_true_iso, y_pred_iso_binary)
    
    print(f"\n孤立森林性能:")
    print(f"  精确率: {precision:.3f}")
    print(f"  召回率: {recall:.3f}")
    print(f"  F1得分: {f1:.3f}")
    
    # 可视化
    fig, axes = plt.subplots(2, 2, figsize=(15, 12))
    fig.suptitle('孤立森林异常检测', fontsize=16)
    
    # 原始数据
    axes[0, 0].scatter(normal_data[:, 0], normal_data[:, 1], 
                      c='blue', alpha=0.6, label='正常点', s=30)
    axes[0, 0].scatter(outliers[:, 0], outliers[:, 1], 
                      c='red', alpha=0.8, label='真实异常点', s=100, marker='x')
    axes[0, 0].set_title('原始数据')
    axes[0, 0].set_xlabel('特征 1')
    axes[0, 0].set_ylabel('特征 2')
    axes[0, 0].legend()
    axes[0, 0].grid(True, alpha=0.3)
    
    # 孤立森林预测结果
    normal_mask = y_pred_iso_binary == 0
    anomaly_mask = y_pred_iso_binary == 1
    
    axes[0, 1].scatter(X_iso[normal_mask, 0], X_iso[normal_mask, 1], 
                      c='blue', alpha=0.6, label='预测正常', s=30)
    axes[0, 1].scatter(X_iso[anomaly_mask, 0], X_iso[anomaly_mask, 1], 
                      c='red', alpha=0.8, label='预测异常', s=60, marker='s')
    axes[0, 1].set_title(f'孤立森林预测 (F1={f1:.3f})')
    axes[0, 1].set_xlabel('特征 1')
    axes[0, 1].set_ylabel('特征 2')
    axes[0, 1].legend()
    axes[0, 1].grid(True, alpha=0.3)
    
    # 异常分数热力图
    xx, yy = np.meshgrid(np.linspace(-6, 6, 50), np.linspace(-6, 6, 50))
    grid_points = np.c_[xx.ravel(), yy.ravel()]
    Z = iso_forest.decision_function(grid_points)
    Z = Z.reshape(xx.shape)
    
    contour = axes[1, 0].contourf(xx, yy, Z, levels=20, cmap='RdYlBu', alpha=0.7)
    axes[1, 0].scatter(X_iso[:, 0], X_iso[:, 1], c=y_true_iso, 
                      cmap='RdYlBu', edgecolors='black', s=50)
    axes[1, 0].set_title('异常分数热力图')
    axes[1, 0].set_xlabel('特征 1')
    axes[1, 0].set_ylabel('特征 2')
    plt.colorbar(contour, ax=axes[1, 0], label='异常分数')
    
    # 异常分数分布
    axes[1, 1].hist(anomaly_scores_iso[y_true_iso == 0], bins=20, alpha=0.6, 
                   label='正常样本', color='blue', density=True)
    axes[1, 1].hist(anomaly_scores_iso[y_true_iso == 1], bins=20, alpha=0.6, 
                   label='异常样本', color='red', density=True)
    axes[1, 1].set_title('异常分数分布')
    axes[1, 1].set_xlabel('异常分数')
    axes[1, 1].set_ylabel('密度')
    axes[1, 1].legend()
    axes[1, 1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    return X_iso, y_true_iso, iso_forest

X_iso, y_true_iso, iso_forest = isolation_forest_demo()

13.4.2 局部异常因子 (LOF)

python
def local_outlier_factor_demo():
    """演示局部异常因子异常检测"""
    
    print("局部异常因子 (LOF) 异常检测:")
    print("基于局部密度的异常检测,适合检测局部异常")
    
    # 创建具有不同密度区域的数据
    np.random.seed(42)
    
    # 高密度区域
    dense_cluster = np.random.multivariate_normal([0, 0], [[0.2, 0], [0, 0.2]], 80)
    
    # 低密度区域
    sparse_cluster = np.random.multivariate_normal([4, 4], [[1, 0], [0, 1]], 40)
    
    # 正常数据
    normal_data = np.vstack([dense_cluster, sparse_cluster])
    
    # 异常点:在密集区域中的稀疏点,在稀疏区域中的密集点
    outliers = np.array([[0, 3], [3, 0], [-3, 0], [0, -3], [7, 7], [1, 7]])
    
    # 合并数据
    X_lof = np.vstack([normal_data, outliers])
    y_true_lof = np.hstack([np.zeros(len(normal_data)), np.ones(len(outliers))])
    
    print(f"数据集大小: {len(X_lof)}")
    print(f"正常样本: {len(normal_data)}")
    print(f"异常样本: {len(outliers)}")
    
    # LOF异常检测
    lof = LocalOutlierFactor(
        n_neighbors=20,
        contamination=0.1,
        novelty=False  # 用于训练数据的异常检测
    )
    
    y_pred_lof = lof.fit_predict(X_lof)
    y_pred_lof_binary = (y_pred_lof == -1).astype(int)
    
    # 获取LOF分数
    lof_scores = -lof.negative_outlier_factor_  # 转换为正值,值越大越异常
    
    # 性能评估
    precision = precision_score(y_true_lof, y_pred_lof_binary)
    recall = recall_score(y_true_lof, y_pred_lof_binary)
    f1 = f1_score(y_true_lof, y_pred_lof_binary)
    
    print(f"\nLOF性能:")
    print(f"  精确率: {precision:.3f}")
    print(f"  召回率: {recall:.3f}")
    print(f"  F1得分: {f1:.3f}")
    
    # 可视化
    fig, axes = plt.subplots(2, 2, figsize=(15, 12))
    fig.suptitle('局部异常因子 (LOF) 异常检测', fontsize=16)
    
    # 原始数据
    axes[0, 0].scatter(dense_cluster[:, 0], dense_cluster[:, 1], 
                      c='lightblue', alpha=0.6, label='高密度区域', s=30)
    axes[0, 0].scatter(sparse_cluster[:, 0], sparse_cluster[:, 1], 
                      c='lightgreen', alpha=0.6, label='低密度区域', s=30)
    axes[0, 0].scatter(outliers[:, 0], outliers[:, 1], 
                      c='red', alpha=0.8, label='真实异常点', s=100, marker='x')
    axes[0, 0].set_title('原始数据(不同密度区域)')
    axes[0, 0].set_xlabel('特征 1')
    axes[0, 0].set_ylabel('特征 2')
    axes[0, 0].legend()
    axes[0, 0].grid(True, alpha=0.3)
    
    # LOF预测结果
    normal_mask = y_pred_lof_binary == 0
    anomaly_mask = y_pred_lof_binary == 1
    
    axes[0, 1].scatter(X_lof[normal_mask, 0], X_lof[normal_mask, 1], 
                      c='blue', alpha=0.6, label='预测正常', s=30)
    axes[0, 1].scatter(X_lof[anomaly_mask, 0], X_lof[anomaly_mask, 1], 
                      c='red', alpha=0.8, label='预测异常', s=60, marker='s')
    axes[0, 1].set_title(f'LOF预测结果 (F1={f1:.3f})')
    axes[0, 1].set_xlabel('特征 1')
    axes[0, 1].set_ylabel('特征 2')
    axes[0, 1].legend()
    axes[0, 1].grid(True, alpha=0.3)
    
    # LOF分数可视化
    scatter = axes[1, 0].scatter(X_lof[:, 0], X_lof[:, 1], c=lof_scores, 
                                cmap='Reds', s=50, alpha=0.7)
    axes[1, 0].set_title('LOF分数可视化')
    axes[1, 0].set_xlabel('特征 1')
    axes[1, 0].set_ylabel('特征 2')
    plt.colorbar(scatter, ax=axes[1, 0], label='LOF分数')
    
    # LOF分数分布
    axes[1, 1].hist(lof_scores[y_true_lof == 0], bins=20, alpha=0.6, 
                   label='正常样本', color='blue', density=True)
    axes[1, 1].hist(lof_scores[y_true_lof == 1], bins=20, alpha=0.6, 
                   label='异常样本', color='red', density=True)
    axes[1, 1].set_title('LOF分数分布')
    axes[1, 1].set_xlabel('LOF分数')
    axes[1, 1].set_ylabel('密度')
    axes[1, 1].legend()
    axes[1, 1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    return X_lof, y_true_lof, lof

X_lof, y_true_lof, lof = local_outlier_factor_demo()

13.4.3 One-Class SVM

python
def one_class_svm_demo():
    """演示One-Class SVM异常检测"""
    
    print("One-Class SVM异常检测:")
    print("学习正常数据的边界,将边界外的点视为异常")
    
    # 使用之前的数据
    # One-Class SVM
    oc_svm = OneClassSVM(
        kernel='rbf',
        gamma='scale',
        nu=0.1  # 预期异常比例的上界
    )
    
    y_pred_svm = oc_svm.fit_predict(X_iso)
    y_pred_svm_binary = (y_pred_svm == -1).astype(int)
    
    # 获取决策分数
    decision_scores = oc_svm.decision_function(X_iso)
    
    # 性能评估
    precision = precision_score(y_true_iso, y_pred_svm_binary)
    recall = recall_score(y_true_iso, y_pred_svm_binary)
    f1 = f1_score(y_true_iso, y_pred_svm_binary)
    
    print(f"\nOne-Class SVM性能:")
    print(f"  精确率: {precision:.3f}")
    print(f"  召回率: {recall:.3f}")
    print(f"  F1得分: {f1:.3f}")
    
    # 可视化
    fig, axes = plt.subplots(1, 3, figsize=(18, 6))
    
    # 预测结果
    normal_mask = y_pred_svm_binary == 0
    anomaly_mask = y_pred_svm_binary == 1
    
    axes[0].scatter(X_iso[normal_mask, 0], X_iso[normal_mask, 1], 
                   c='blue', alpha=0.6, label='预测正常', s=30)
    axes[0].scatter(X_iso[anomaly_mask, 0], X_iso[anomaly_mask, 1], 
                   c='red', alpha=0.8, label='预测异常', s=60, marker='s')
    
    # 绘制决策边界
    xx, yy = np.meshgrid(np.linspace(-6, 6, 100), np.linspace(-6, 6, 100))
    grid_points = np.c_[xx.ravel(), yy.ravel()]
    Z = oc_svm.decision_function(grid_points)
    Z = Z.reshape(xx.shape)
    
    axes[0].contour(xx, yy, Z, levels=[0], colors='black', linestyles='--', linewidths=2)
    axes[0].set_title(f'One-Class SVM (F1={f1:.3f})')
    axes[0].set_xlabel('特征 1')
    axes[0].set_ylabel('特征 2')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)
    
    # 决策分数热力图
    contour = axes[1].contourf(xx, yy, Z, levels=20, cmap='RdYlBu', alpha=0.7)
    axes[1].scatter(X_iso[:, 0], X_iso[:, 1], c=y_true_iso, 
                   cmap='RdYlBu', edgecolors='black', s=50)
    axes[1].contour(xx, yy, Z, levels=[0], colors='black', linestyles='-', linewidths=2)
    axes[1].set_title('决策边界和分数')
    axes[1].set_xlabel('特征 1')
    axes[1].set_ylabel('特征 2')
    plt.colorbar(contour, ax=axes[1], label='决策分数')
    
    # 决策分数分布
    axes[2].hist(decision_scores[y_true_iso == 0], bins=20, alpha=0.6, 
                label='正常样本', color='blue', density=True)
    axes[2].hist(decision_scores[y_true_iso == 1], bins=20, alpha=0.6, 
                label='异常样本', color='red', density=True)
    axes[2].axvline(x=0, color='black', linestyle='--', alpha=0.7, label='决策边界')
    axes[2].set_title('决策分数分布')
    axes[2].set_xlabel('决策分数')
    axes[2].set_ylabel('密度')
    axes[2].legend()
    axes[2].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    return oc_svm, decision_scores

oc_svm, decision_scores = one_class_svm_demo()
```## 13.
5 算法比较和选择

### 13.5.1 综合性能比较

```python
def comprehensive_anomaly_detection_comparison():
    """综合比较不同异常检测算法"""
    
    print("异常检测算法综合比较:")
    print("=" * 30)
    
    # 创建多种类型的数据集
    datasets = []
    
    # 1. 球形聚类数据
    np.random.seed(42)
    normal_spherical = np.random.multivariate_normal([0, 0], [[1, 0], [0, 1]], 150)
    outliers_spherical = np.random.uniform(-4, 4, (15, 2))
    X_spherical = np.vstack([normal_spherical, outliers_spherical])
    y_spherical = np.hstack([np.zeros(150), np.ones(15)])
    datasets.append(('球形聚类', X_spherical, y_spherical))
    
    # 2. 椭圆聚类数据
    normal_elliptical = np.random.multivariate_normal([0, 0], [[2, 1.5], [1.5, 2]], 150)
    outliers_elliptical = np.array([[5, 5], [-5, -5], [5, -5], [-5, 5], [0, 6], [6, 0]])
    outliers_elliptical = np.vstack([outliers_elliptical, np.random.uniform(-6, 6, (9, 2))])
    X_elliptical = np.vstack([normal_elliptical, outliers_elliptical])
    y_elliptical = np.hstack([np.zeros(150), np.ones(15)])
    datasets.append(('椭圆聚类', X_elliptical, y_elliptical))
    
    # 3. 多聚类数据
    cluster1 = np.random.multivariate_normal([2, 2], [[0.5, 0], [0, 0.5]], 50)
    cluster2 = np.random.multivariate_normal([-2, -2], [[0.5, 0], [0, 0.5]], 50)
    cluster3 = np.random.multivariate_normal([2, -2], [[0.5, 0], [0, 0.5]], 50)
    normal_multi = np.vstack([cluster1, cluster2, cluster3])
    outliers_multi = np.random.uniform(-5, 5, (15, 2))
    X_multi = np.vstack([normal_multi, outliers_multi])
    y_multi = np.hstack([np.zeros(150), np.ones(15)])
    datasets.append(('多聚类', X_multi, y_multi))
    
    # 异常检测算法
    algorithms = {
        '孤立森林': IsolationForest(contamination=0.1, random_state=42),
        'LOF': LocalOutlierFactor(contamination=0.1, n_neighbors=20),
        'One-Class SVM': OneClassSVM(nu=0.1, kernel='rbf', gamma='scale'),
        '椭圆包络': EllipticEnvelope(contamination=0.1, random_state=42)
    }
    
    # 存储结果
    results = {}
    
    print("数据集\t\t算法\t\t精确率\t召回率\tF1得分")
    print("-" * 60)
    
    fig, axes = plt.subplots(len(datasets), len(algorithms) + 1, figsize=(20, 12))
    fig.suptitle('异常检测算法比较', fontsize=16)
    
    for i, (dataset_name, X, y_true) in enumerate(datasets):
        results[dataset_name] = {}
        
        # 标准化数据
        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(X)
        
        # 绘制原始数据
        normal_mask = y_true == 0
        anomaly_mask = y_true == 1
        
        axes[i, 0].scatter(X[normal_mask, 0], X[normal_mask, 1], 
                          c='blue', alpha=0.6, label='正常', s=20)
        axes[i, 0].scatter(X[anomaly_mask, 0], X[anomaly_mask, 1], 
                          c='red', alpha=0.8, label='异常', s=60, marker='x')
        axes[i, 0].set_title(f'{dataset_name}数据')
        axes[i, 0].legend()
        axes[i, 0].grid(True, alpha=0.3)
        
        for j, (alg_name, algorithm) in enumerate(algorithms.items()):
            try:
                # 训练和预测
                if alg_name == 'LOF':
                    y_pred = algorithm.fit_predict(X_scaled)
                else:
                    y_pred = algorithm.fit_predict(X_scaled)
                
                # 转换预测结果
                y_pred_binary = (y_pred == -1).astype(int)
                
                # 计算性能指标
                precision = precision_score(y_true, y_pred_binary, zero_division=0)
                recall = recall_score(y_true, y_pred_binary, zero_division=0)
                f1 = f1_score(y_true, y_pred_binary, zero_division=0)
                
                results[dataset_name][alg_name] = {
                    'precision': precision,
                    'recall': recall,
                    'f1': f1
                }
                
                print(f"{dataset_name}\t{alg_name}\t{precision:.3f}\t\t{recall:.3f}\t{f1:.3f}")
                
                # 可视化结果
                pred_normal_mask = y_pred_binary == 0
                pred_anomaly_mask = y_pred_binary == 1
                
                axes[i, j + 1].scatter(X[pred_normal_mask, 0], X[pred_normal_mask, 1], 
                                      c='blue', alpha=0.6, s=20)
                axes[i, j + 1].scatter(X[pred_anomaly_mask, 0], X[pred_anomaly_mask, 1], 
                                      c='red', alpha=0.8, s=40, marker='s')
                axes[i, j + 1].set_title(f'{alg_name}\nF1={f1:.3f}')
                axes[i, j + 1].grid(True, alpha=0.3)
                
            except Exception as e:
                print(f"{dataset_name}\t{alg_name}\t错误: {str(e)[:20]}")
                axes[i, j + 1].text(0.5, 0.5, '算法失败', ha='center', va='center',
                                   transform=axes[i, j + 1].transAxes)
                axes[i, j + 1].set_title(f'{alg_name}\n失败')
    
    plt.tight_layout()
    plt.show()
    
    return results

comparison_results = comprehensive_anomaly_detection_comparison()

13.5.2 算法选择指南

python
def anomaly_detection_selection_guide():
    """异常检测算法选择指南"""
    
    print("异常检测算法选择指南:")
    print("=" * 30)
    
    algorithm_guide = {
        '孤立森林': {
            '适用场景': ['大数据集', '高维数据', '全局异常检测'],
            '优点': ['训练快速', '内存效率高', '无需标准化', '处理高维数据好'],
            '缺点': ['对正常数据密度敏感', '参数调优困难'],
            '最佳使用': '当数据集较大且异常点相对孤立时',
            '参数': 'contamination, n_estimators, max_samples'
        },
        
        'LOF': {
            '适用场景': ['局部异常检测', '密度变化的数据', '中小数据集'],
            '优点': ['检测局部异常', '适应不同密度', '无需假设数据分布'],
            '缺点': ['计算复杂度高', '对参数敏感', '不适合高维数据'],
            '最佳使用': '当异常点在局部区域中密度明显不同时',
            '参数': 'n_neighbors, contamination'
        },
        
        'One-Class SVM': {
            '适用场景': ['非线性边界', '小数据集', '需要明确边界'],
            '优点': ['处理非线性边界', '理论基础强', '适合小数据集'],
            '缺点': ['参数调优复杂', '计算复杂度高', '对缩放敏感'],
            '最佳使用': '当正常数据有明确边界且数据集不太大时',
            '参数': 'nu, kernel, gamma'
        },
        
        '椭圆包络': {
            '适用场景': ['高斯分布数据', '需要概率解释', '低维数据'],
            '优点': ['假设明确', '计算快速', '提供概率解释'],
            '缺点': ['假设数据服从高斯分布', '对非高斯数据效果差'],
            '最佳使用': '当确信正常数据服从多元高斯分布时',
            '参数': 'contamination, support_fraction'
        }
    }
    
    for alg_name, info in algorithm_guide.items():
        print(f"\n{alg_name}:")
        print("-" * 20)
        for key, value in info.items():
            if isinstance(value, list):
                print(f"{key}: {', '.join(value)}")
            else:
                print(f"{key}: {value}")
    
    # 决策树
    print(f"\n算法选择决策树:")
    print("-" * 20)
    
    decision_tree = """
    数据集大小?
    ├─ 大(>10000) → 孤立森林
    └─ 小(<10000) ──┐

    数据分布?      │
    ├─ 高斯分布 → 椭圆包络
    ├─ 多聚类 → LOF
    └─ 未知 ──┐

    异常类型?│
    ├─ 全局异常 → 孤立森林 或 One-Class SVM
    ├─ 局部异常 → LOF
    └─ 边界异常 → One-Class SVM
    
    计算资源?
    ├─ 有限 → 椭圆包络 或 孤立森林
    └─ 充足 → LOF 或 One-Class SVM
    """
    
    print(decision_tree)
    
    # 性能总结表
    performance_summary = pd.DataFrame({
        '算法': ['孤立森林', 'LOF', 'One-Class SVM', '椭圆包络'],
        '训练速度': ['快', '慢', '中', '快'],
        '预测速度': ['快', '慢', '中', '快'],
        '内存使用': ['低', '高', '中', '低'],
        '参数敏感性': ['低', '高', '高', '低'],
        '高维适应性': ['好', '差', '中', '中'],
        '非线性处理': ['中', '好', '好', '差']
    })
    
    print(f"\n算法性能总结:")
    print(performance_summary.to_string(index=False))
    
    return algorithm_guide

algorithm_guide = anomaly_detection_selection_guide()

13.6 实际应用案例

13.6.1 信用卡欺诈检测

python
def credit_card_fraud_detection():
    """信用卡欺诈检测案例"""
    
    print("信用卡欺诈检测案例:")
    print("=" * 25)
    
    # 创建模拟信用卡交易数据
    np.random.seed(42)
    n_normal = 1000
    n_fraud = 50
    
    # 正常交易特征
    # 交易金额:对数正态分布
    normal_amounts = np.random.lognormal(mean=3, sigma=1, size=n_normal)
    # 交易时间:正常工作时间更多
    normal_hours = np.random.choice(range(24), size=n_normal, 
                                   p=[0.02]*6 + [0.08]*12 + [0.04]*6)  # 6-18点概率更高
    # 商户类型:正常分布
    normal_merchant_types = np.random.normal(5, 2, n_normal)
    # 地理位置:集中在某些区域
    normal_locations = np.random.multivariate_normal([0, 0], [[1, 0.3], [0.3, 1]], n_normal)
    
    # 欺诈交易特征
    # 交易金额:通常很大或很小
    fraud_amounts = np.concatenate([
        np.random.lognormal(mean=6, sigma=0.5, size=n_fraud//2),  # 大额
        np.random.lognormal(mean=1, sigma=0.5, size=n_fraud//2)   # 小额测试
    ])
    # 交易时间:异常时间更多
    fraud_hours = np.random.choice(range(24), size=n_fraud,
                                  p=[0.08]*6 + [0.02]*12 + [0.08]*6)  # 非工作时间概率更高
    # 商户类型:异常类型
    fraud_merchant_types = np.random.normal(10, 3, n_fraud)
    # 地理位置:异常位置
    fraud_locations = np.random.multivariate_normal([5, 5], [[2, 0], [0, 2]], n_fraud)
    
    # 合并数据
    amounts = np.concatenate([normal_amounts, fraud_amounts])
    hours = np.concatenate([normal_hours, fraud_hours])
    merchant_types = np.concatenate([normal_merchant_types, fraud_merchant_types])
    locations = np.vstack([normal_locations, fraud_locations])
    
    # 创建特征矩阵
    X_fraud = np.column_stack([
        amounts,
        hours,
        merchant_types,
        locations[:, 0],  # 纬度
        locations[:, 1]   # 经度
    ])
    
    y_fraud = np.concatenate([np.zeros(n_normal), np.ones(n_fraud)])
    
    feature_names = ['交易金额', '交易时间', '商户类型', '纬度', '经度']
    
    print(f"数据集信息:")
    print(f"  总交易数: {len(X_fraud)}")
    print(f"  正常交易: {n_normal} ({n_normal/len(X_fraud)*100:.1f}%)")
    print(f"  欺诈交易: {n_fraud} ({n_fraud/len(X_fraud)*100:.1f}%)")
    
    # 数据标准化
    scaler = StandardScaler()
    X_fraud_scaled = scaler.fit_transform(X_fraud)
    
    # 应用不同的异常检测算法
    algorithms = {
        '孤立森林': IsolationForest(contamination=0.05, random_state=42),
        'LOF': LocalOutlierFactor(contamination=0.05, n_neighbors=20),
        'One-Class SVM': OneClassSVM(nu=0.05, kernel='rbf', gamma='scale')
    }
    
    results = {}
    
    print(f"\n欺诈检测结果:")
    print("算法\t\t精确率\t召回率\tF1得分\tAUC")
    print("-" * 50)
    
    fig, axes = plt.subplots(2, 3, figsize=(18, 12))
    fig.suptitle('信用卡欺诈检测', fontsize=16)
    
    for i, (alg_name, algorithm) in enumerate(algorithms.items()):
        # 训练和预测
        y_pred = algorithm.fit_predict(X_fraud_scaled)
        y_pred_binary = (y_pred == -1).astype(int)
        
        # 获取异常分数
        if hasattr(algorithm, 'decision_function'):
            scores = algorithm.decision_function(X_fraud_scaled)
        elif hasattr(algorithm, 'negative_outlier_factor_'):
            scores = -algorithm.negative_outlier_factor_
        else:
            scores = np.random.random(len(X_fraud_scaled))  # 占位符
        
        # 计算性能指标
        precision = precision_score(y_fraud, y_pred_binary)
        recall = recall_score(y_fraud, y_pred_binary)
        f1 = f1_score(y_fraud, y_pred_binary)
        
        # 计算AUC(需要转换分数)
        if alg_name == 'LOF':
            auc = roc_auc_score(y_fraud, scores)
        else:
            auc = roc_auc_score(y_fraud, -scores)  # 分数越低越异常
        
        results[alg_name] = {
            'precision': precision,
            'recall': recall,
            'f1': f1,
            'auc': auc,
            'predictions': y_pred_binary
        }
        
        print(f"{alg_name}\t{precision:.3f}\t\t{recall:.3f}\t{f1:.3f}\t{auc:.3f}")
        
        # 可视化:交易金额 vs 交易时间
        row = i // 3
        col = i % 3
        
        normal_mask = y_pred_binary == 0
        fraud_mask = y_pred_binary == 1
        
        axes[row, col].scatter(X_fraud[normal_mask, 0], X_fraud[normal_mask, 1], 
                              c='blue', alpha=0.6, label='预测正常', s=20)
        axes[row, col].scatter(X_fraud[fraud_mask, 0], X_fraud[fraud_mask, 1], 
                              c='red', alpha=0.8, label='预测欺诈', s=40, marker='s')
        axes[row, col].set_title(f'{alg_name} (F1={f1:.3f})')
        axes[row, col].set_xlabel('交易金额')
        axes[row, col].set_ylabel('交易时间')
        axes[row, col].legend()
        axes[row, col].grid(True, alpha=0.3)
    
    # 特征分布比较
    axes[1, 0].hist(X_fraud[y_fraud == 0, 0], bins=30, alpha=0.6, 
                   label='正常交易', color='blue', density=True)
    axes[1, 0].hist(X_fraud[y_fraud == 1, 0], bins=30, alpha=0.6, 
                   label='欺诈交易', color='red', density=True)
    axes[1, 0].set_title('交易金额分布')
    axes[1, 0].set_xlabel('交易金额')
    axes[1, 0].set_ylabel('密度')
    axes[1, 0].legend()
    axes[1, 0].grid(True, alpha=0.3)
    
    # 地理位置分布
    axes[1, 1].scatter(X_fraud[y_fraud == 0, 3], X_fraud[y_fraud == 0, 4], 
                      c='blue', alpha=0.6, label='正常交易', s=20)
    axes[1, 1].scatter(X_fraud[y_fraud == 1, 3], X_fraud[y_fraud == 1, 4], 
                      c='red', alpha=0.8, label='欺诈交易', s=60, marker='x')
    axes[1, 1].set_title('地理位置分布')
    axes[1, 1].set_xlabel('纬度')
    axes[1, 1].set_ylabel('经度')
    axes[1, 1].legend()
    axes[1, 1].grid(True, alpha=0.3)
    
    # 性能比较
    algorithms_list = list(results.keys())
    f1_scores = [results[alg]['f1'] for alg in algorithms_list]
    auc_scores = [results[alg]['auc'] for alg in algorithms_list]
    
    x_pos = np.arange(len(algorithms_list))
    width = 0.35
    
    axes[1, 2].bar(x_pos - width/2, f1_scores, width, label='F1得分', alpha=0.7)
    axes[1, 2].bar(x_pos + width/2, auc_scores, width, label='AUC得分', alpha=0.7)
    axes[1, 2].set_title('性能比较')
    axes[1, 2].set_xlabel('算法')
    axes[1, 2].set_ylabel('得分')
    axes[1, 2].set_xticks(x_pos)
    axes[1, 2].set_xticklabels(algorithms_list, rotation=45)
    axes[1, 2].legend()
    axes[1, 2].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    # 业务影响分析
    print(f"\n业务影响分析:")
    best_algorithm = max(results.keys(), key=lambda x: results[x]['f1'])
    best_results = results[best_algorithm]
    
    print(f"最佳算法: {best_algorithm}")
    print(f"  检测到的欺诈交易: {np.sum(best_results['predictions'])}")
    print(f"  实际欺诈交易: {np.sum(y_fraud)}")
    print(f"  漏检的欺诈交易: {np.sum(y_fraud) - np.sum((y_fraud == 1) & (best_results['predictions'] == 1))}")
    print(f"  误报的正常交易: {np.sum((y_fraud == 0) & (best_results['predictions'] == 1))}")
    
    return X_fraud, y_fraud, results

X_fraud, y_fraud, fraud_results = credit_card_fraud_detection()
```### 
13.6.2 网络入侵检测

```python
def network_intrusion_detection():
    """网络入侵检测案例"""
    
    print("网络入侵检测案例:")
    print("=" * 20)
    
    # 创建模拟网络流量数据
    np.random.seed(42)
    n_normal = 800
    n_intrusion = 40
    
    # 正常网络流量特征
    normal_packet_size = np.random.lognormal(mean=6, sigma=1, size=n_normal)
    normal_duration = np.random.exponential(scale=10, size=n_normal)
    normal_src_bytes = np.random.lognormal(mean=8, sigma=1.5, size=n_normal)
    normal_dst_bytes = np.random.lognormal(mean=7, sigma=1.2, size=n_normal)
    normal_protocol = np.random.choice([0, 1, 2], size=n_normal, p=[0.6, 0.3, 0.1])
    
    # 入侵流量特征(异常模式)
    intrusion_packet_size = np.random.lognormal(mean=4, sigma=2, size=n_intrusion)
    intrusion_duration = np.random.exponential(scale=50, size=n_intrusion)  # 更长的连接
    intrusion_src_bytes = np.random.lognormal(mean=10, sigma=2, size=n_intrusion)  # 更多数据
    intrusion_dst_bytes = np.random.lognormal(mean=5, sigma=2, size=n_intrusion)   # 不对称
    intrusion_protocol = np.random.choice([0, 1, 2], size=n_intrusion, p=[0.2, 0.2, 0.6])  # 异常协议
    
    # 合并数据
    X_network = np.column_stack([
        np.concatenate([normal_packet_size, intrusion_packet_size]),
        np.concatenate([normal_duration, intrusion_duration]),
        np.concatenate([normal_src_bytes, intrusion_src_bytes]),
        np.concatenate([normal_dst_bytes, intrusion_dst_bytes]),
        np.concatenate([normal_protocol, intrusion_protocol])
    ])
    
    y_network = np.concatenate([np.zeros(n_normal), np.ones(n_intrusion)])
    
    feature_names = ['包大小', '连接时长', '源字节数', '目标字节数', '协议类型']
    
    print(f"网络流量数据:")
    print(f"  总连接数: {len(X_network)}")
    print(f"  正常连接: {n_normal} ({n_normal/len(X_network)*100:.1f}%)")
    print(f"  入侵连接: {n_intrusion} ({n_intrusion/len(X_network)*100:.1f}%)")
    
    # 数据标准化
    scaler = StandardScaler()
    X_network_scaled = scaler.fit_transform(X_network)
    
    # 使用孤立森林进行入侵检测
    iso_forest_network = IsolationForest(
        contamination=0.05,
        random_state=42,
        n_estimators=200
    )
    
    y_pred_network = iso_forest_network.fit_predict(X_network_scaled)
    y_pred_network_binary = (y_pred_network == -1).astype(int)
    
    # 性能评估
    precision = precision_score(y_network, y_pred_network_binary)
    recall = recall_score(y_network, y_pred_network_binary)
    f1 = f1_score(y_network, y_pred_network_binary)
    
    print(f"\n入侵检测结果:")
    print(f"  精确率: {precision:.3f}")
    print(f"  召回率: {recall:.3f}")
    print(f"  F1得分: {f1:.3f}")
    
    # 可视化
    fig, axes = plt.subplots(2, 2, figsize=(15, 12))
    fig.suptitle('网络入侵检测', fontsize=16)
    
    # 特征分布比较
    axes[0, 0].hist(X_network[y_network == 0, 1], bins=30, alpha=0.6, 
                   label='正常连接', color='blue', density=True)
    axes[0, 0].hist(X_network[y_network == 1, 1], bins=30, alpha=0.6, 
                   label='入侵连接', color='red', density=True)
    axes[0, 0].set_title('连接时长分布')
    axes[0, 0].set_xlabel('连接时长')
    axes[0, 0].set_ylabel('密度')
    axes[0, 0].legend()
    axes[0, 0].grid(True, alpha=0.3)
    
    # 数据传输模式
    axes[0, 1].scatter(X_network[y_network == 0, 2], X_network[y_network == 0, 3], 
                      c='blue', alpha=0.6, label='正常连接', s=20)
    axes[0, 1].scatter(X_network[y_network == 1, 2], X_network[y_network == 1, 3], 
                      c='red', alpha=0.8, label='入侵连接', s=60, marker='x')
    axes[0, 1].set_title('数据传输模式')
    axes[0, 1].set_xlabel('源字节数')
    axes[0, 1].set_ylabel('目标字节数')
    axes[0, 1].legend()
    axes[0, 1].grid(True, alpha=0.3)
    
    # 检测结果
    normal_mask = y_pred_network_binary == 0
    intrusion_mask = y_pred_network_binary == 1
    
    axes[1, 0].scatter(X_network[normal_mask, 1], X_network[normal_mask, 2], 
                      c='blue', alpha=0.6, label='预测正常', s=20)
    axes[1, 0].scatter(X_network[intrusion_mask, 1], X_network[intrusion_mask, 2], 
                      c='red', alpha=0.8, label='预测入侵', s=40, marker='s')
    axes[1, 0].set_title(f'入侵检测结果 (F1={f1:.3f})')
    axes[1, 0].set_xlabel('连接时长')
    axes[1, 0].set_ylabel('源字节数')
    axes[1, 0].legend()
    axes[1, 0].grid(True, alpha=0.3)
    
    # 混淆矩阵
    cm = confusion_matrix(y_network, y_pred_network_binary)
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=axes[1, 1])
    axes[1, 1].set_title('混淆矩阵')
    axes[1, 1].set_xlabel('预测标签')
    axes[1, 1].set_ylabel('真实标签')
    
    plt.tight_layout()
    plt.show()
    
    return X_network, y_network, iso_forest_network

X_network, y_network, iso_forest_network = network_intrusion_detection()

13.7 练习题

练习1:基础异常检测

  1. 创建一个包含异常点的二维数据集
  2. 使用Z-score、IQR和马氏距离方法检测异常
  3. 比较三种方法的检测效果

练习2:算法比较

  1. 使用make_blobs创建多聚类数据并添加异常点
  2. 应用孤立森林、LOF、One-Class SVM和椭圆包络
  3. 分析每种算法的优缺点

练习3:参数调优

  1. 使用孤立森林进行异常检测
  2. 调优contamination和n_estimators参数
  3. 分析参数对检测性能的影响

练习4:高维数据

  1. 创建一个高维数据集(特征数>20)
  2. 比较不同算法在高维数据上的表现
  3. 分析维度诅咒对异常检测的影响

练习5:实际应用

  1. 模拟一个质量控制场景的数据
  2. 构建异常检测系统识别缺陷产品
  3. 评估系统的业务价值和成本效益

13.8 小结

在本章中,我们深入学习了异常检测的各个方面:

核心概念

  • 异常类型:点异常、上下文异常、集体异常
  • 检测方法:统计方法、机器学习方法
  • 评估指标:精确率、召回率、F1得分、AUC

主要算法

  • 统计方法:Z-score、IQR、马氏距离、椭圆包络
  • 孤立森林:基于随机分割的异常检测
  • LOF:基于局部密度的异常检测
  • One-Class SVM:基于边界的异常检测

实践技能

  • 数据预处理:标准化、特征工程
  • 算法选择:根据数据特点选择合适算法
  • 参数调优:contamination等关键参数
  • 性能评估:多种指标综合评估

关键要点

  • 异常检测是无监督学习问题
  • 不同算法适用于不同类型的异常
  • 业务理解对异常检测至关重要
  • 需要平衡检测率和误报率

算法选择建议

使用孤立森林当

  • 数据集较大
  • 需要快速检测
  • 异常点相对孤立
  • 处理高维数据

使用LOF当

  • 需要检测局部异常
  • 数据密度变化较大
  • 数据集中等规模
  • 对检测精度要求高

使用One-Class SVM当

  • 需要非线性边界
  • 数据集较小
  • 需要明确的决策边界
  • 对理论基础有要求

使用椭圆包络当

  • 数据服从高斯分布
  • 需要快速检测
  • 需要概率解释
  • 数据维度不高

实际应用考虑

  1. 业务理解

    • 明确异常的定义
    • 了解异常的业务影响
    • 确定可接受的误报率
  2. 数据质量

    • 处理缺失值和噪声
    • 进行适当的特征工程
    • 考虑数据的时间性
  3. 模型部署

    • 考虑实时性要求
    • 建立监控和更新机制
    • 处理概念漂移问题

13.9 下一步

现在你已经掌握了异常检测这个重要的无监督学习技术!在下一章交叉验证中,我们将学习如何正确评估机器学习模型的性能,避免过拟合和数据泄露等问题。


章节要点回顾

  • ✅ 理解了异常检测的基本概念和应用场景
  • ✅ 掌握了统计方法和机器学习方法
  • ✅ 学会了不同算法的原理和实现
  • ✅ 了解了算法选择的标准和方法
  • ✅ 掌握了异常检测的评估指标
  • ✅ 能够在实际业务场景中应用异常检测技术

本站内容仅供学习和研究使用。