第13章:异常检测
异常检测是识别数据中不符合预期模式的观测值的过程。这些异常点可能表示错误、欺诈、系统故障或其他重要事件。本章将详细介绍各种异常检测算法的原理、实现和应用。
13.1 什么是异常检测?
异常检测(Anomaly Detection),也称为离群点检测(Outlier Detection),是识别与大多数数据显著不同的数据点的过程。
13.1.1 异常的类型
- 点异常:单个数据点异常
- 上下文异常:在特定上下文中异常
- 集体异常:一组数据点共同表现异常
13.1.2 异常检测的应用
- 欺诈检测:信用卡交易、保险理赔
- 网络安全:入侵检测、恶意软件识别
- 质量控制:制造业缺陷检测
- 医疗诊断:疾病筛查、异常生理指标
- 系统监控:服务器性能、网络流量
13.1.3 异常检测的挑战
- 标签稀缺:异常样本通常很少
- 类别不平衡:正常样本远多于异常样本
- 概念漂移:异常模式可能随时间变化
- 高维数据:维度诅咒影响检测效果
13.2 准备环境和数据
python
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.datasets import make_blobs, make_classification
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
from sklearn.covariance import EllipticEnvelope
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, roc_curve
from sklearn.model_selection import train_test_split
import warnings
warnings.filterwarnings('ignore')
# 设置随机种子
np.random.seed(42)
# 设置图形样式
plt.style.use('seaborn-v0_8')
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False13.3 统计方法
13.3.1 基于距离的异常检测
python
def statistical_anomaly_detection():
"""演示基于统计的异常检测方法"""
print("基于统计的异常检测方法:")
print("=" * 30)
# 创建正常数据
np.random.seed(42)
normal_data = np.random.multivariate_normal([0, 0], [[1, 0.5], [0.5, 1]], 200)
# 添加异常点
outliers = np.array([[4, 4], [-4, -4], [4, -4], [-4, 4], [0, 5], [5, 0]])
# 合并数据
X_combined = np.vstack([normal_data, outliers])
y_true = np.hstack([np.zeros(len(normal_data)), np.ones(len(outliers))])
print(f"数据集大小: {len(X_combined)}")
print(f"正常样本: {len(normal_data)}")
print(f"异常样本: {len(outliers)}")
# 方法1: Z-score方法
def zscore_anomaly_detection(X, threshold=2.5):
"""基于Z-score的异常检测"""
mean = np.mean(X, axis=0)
std = np.std(X, axis=0)
z_scores = np.abs((X - mean) / std)
# 如果任一维度的Z-score超过阈值,则认为是异常
anomalies = np.any(z_scores > threshold, axis=1)
return anomalies.astype(int)
# 方法2: 马氏距离方法
def mahalanobis_anomaly_detection(X, threshold=2.5):
"""基于马氏距离的异常检测"""
mean = np.mean(X, axis=0)
cov = np.cov(X.T)
# 计算马氏距离
diff = X - mean
mahal_dist = np.sqrt(np.sum(diff @ np.linalg.inv(cov) * diff, axis=1))
# 基于阈值判断异常
anomalies = mahal_dist > threshold
return anomalies.astype(int)
# 方法3: 四分位距(IQR)方法
def iqr_anomaly_detection(X, factor=1.5):
"""基于IQR的异常检测"""
Q1 = np.percentile(X, 25, axis=0)
Q3 = np.percentile(X, 75, axis=0)
IQR = Q3 - Q1
lower_bound = Q1 - factor * IQR
upper_bound = Q3 + factor * IQR
# 如果任一维度超出边界,则认为是异常
anomalies = np.any((X < lower_bound) | (X > upper_bound), axis=1)
return anomalies.astype(int)
# 应用不同方法
methods = {
'Z-score': zscore_anomaly_detection(X_combined),
'马氏距离': mahalanobis_anomaly_detection(X_combined),
'IQR': iqr_anomaly_detection(X_combined)
}
# 可视化结果
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('基于统计的异常检测方法', fontsize=16)
# 原始数据
axes[0, 0].scatter(normal_data[:, 0], normal_data[:, 1],
c='blue', alpha=0.6, label='正常点', s=30)
axes[0, 0].scatter(outliers[:, 0], outliers[:, 1],
c='red', alpha=0.8, label='真实异常点', s=100, marker='x')
axes[0, 0].set_title('原始数据')
axes[0, 0].set_xlabel('特征 1')
axes[0, 0].set_ylabel('特征 2')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 不同方法的结果
method_names = list(methods.keys())
for i, (method_name, predictions) in enumerate(methods.items()):
row = (i + 1) // 2
col = (i + 1) % 2
# 计算性能指标
from sklearn.metrics import precision_score, recall_score, f1_score
precision = precision_score(y_true, predictions)
recall = recall_score(y_true, predictions)
f1 = f1_score(y_true, predictions)
print(f"\n{method_name}方法:")
print(f" 精确率: {precision:.3f}")
print(f" 召回率: {recall:.3f}")
print(f" F1得分: {f1:.3f}")
# 可视化
normal_mask = predictions == 0
anomaly_mask = predictions == 1
axes[row, col].scatter(X_combined[normal_mask, 0], X_combined[normal_mask, 1],
c='blue', alpha=0.6, label='预测正常', s=30)
axes[row, col].scatter(X_combined[anomaly_mask, 0], X_combined[anomaly_mask, 1],
c='red', alpha=0.8, label='预测异常', s=60, marker='s')
axes[row, col].set_title(f'{method_name} (F1={f1:.3f})')
axes[row, col].set_xlabel('特征 1')
axes[row, col].set_ylabel('特征 2')
axes[row, col].legend()
axes[row, col].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return X_combined, y_true, methods
X_combined, y_true, statistical_methods = statistical_anomaly_detection()
```##
# 13.3.2 椭圆包络法
```python
def elliptic_envelope_demo():
"""演示椭圆包络异常检测"""
print("椭圆包络异常检测:")
print("假设正常数据服从多元高斯分布")
# 使用之前的数据
# 椭圆包络法
elliptic_env = EllipticEnvelope(contamination=0.1, random_state=42)
y_pred_elliptic = elliptic_env.fit_predict(X_combined)
# 转换预测结果 (-1 -> 1, 1 -> 0)
y_pred_elliptic_binary = (y_pred_elliptic == -1).astype(int)
# 获取异常分数
anomaly_scores = elliptic_env.decision_function(X_combined)
# 计算性能指标
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
precision = precision_score(y_true, y_pred_elliptic_binary)
recall = recall_score(y_true, y_pred_elliptic_binary)
f1 = f1_score(y_true, y_pred_elliptic_binary)
accuracy = accuracy_score(y_true, y_pred_elliptic_binary)
print(f"椭圆包络法性能:")
print(f" 准确率: {accuracy:.3f}")
print(f" 精确率: {precision:.3f}")
print(f" 召回率: {recall:.3f}")
print(f" F1得分: {f1:.3f}")
# 可视化
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
# 原始数据和预测结果
normal_mask = y_pred_elliptic_binary == 0
anomaly_mask = y_pred_elliptic_binary == 1
axes[0].scatter(X_combined[normal_mask, 0], X_combined[normal_mask, 1],
c='blue', alpha=0.6, label='预测正常', s=30)
axes[0].scatter(X_combined[anomaly_mask, 0], X_combined[anomaly_mask, 1],
c='red', alpha=0.8, label='预测异常', s=60, marker='s')
# 绘制椭圆边界
xx, yy = np.meshgrid(np.linspace(-6, 6, 100), np.linspace(-6, 6, 100))
grid_points = np.c_[xx.ravel(), yy.ravel()]
Z = elliptic_env.decision_function(grid_points)
Z = Z.reshape(xx.shape)
axes[0].contour(xx, yy, Z, levels=[0], colors='black', linestyles='--', linewidths=2)
axes[0].set_title('椭圆包络异常检测')
axes[0].set_xlabel('特征 1')
axes[0].set_ylabel('特征 2')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 异常分数分布
axes[1].hist(anomaly_scores[y_true == 0], bins=20, alpha=0.6,
label='正常样本', color='blue', density=True)
axes[1].hist(anomaly_scores[y_true == 1], bins=20, alpha=0.6,
label='异常样本', color='red', density=True)
axes[1].axvline(x=0, color='black', linestyle='--', alpha=0.7, label='决策边界')
axes[1].set_title('异常分数分布')
axes[1].set_xlabel('异常分数')
axes[1].set_ylabel('密度')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
# ROC曲线
# 需要将异常分数转换为概率(分数越低越异常)
anomaly_probs = -anomaly_scores # 转换符号
fpr, tpr, _ = roc_curve(y_true, anomaly_probs)
auc_score = roc_auc_score(y_true, anomaly_probs)
axes[2].plot(fpr, tpr, linewidth=2, label=f'ROC曲线 (AUC = {auc_score:.3f})')
axes[2].plot([0, 1], [0, 1], 'k--', alpha=0.7, label='随机分类器')
axes[2].set_title('ROC曲线')
axes[2].set_xlabel('假正例率')
axes[2].set_ylabel('真正例率')
axes[2].legend()
axes[2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return elliptic_env, anomaly_scores
elliptic_env, anomaly_scores = elliptic_envelope_demo()13.4 基于机器学习的方法
13.4.1 孤立森林
python
def isolation_forest_demo():
"""演示孤立森林异常检测"""
print("孤立森林异常检测:")
print("基于随机分割的思想,异常点更容易被孤立")
# 创建更复杂的数据集
np.random.seed(42)
# 正常数据:两个聚类
cluster1 = np.random.multivariate_normal([2, 2], [[0.5, 0.1], [0.1, 0.5]], 100)
cluster2 = np.random.multivariate_normal([-2, -2], [[0.5, -0.1], [-0.1, 0.5]], 100)
normal_data = np.vstack([cluster1, cluster2])
# 异常数据:随机分布
outliers = np.random.uniform(-5, 5, (20, 2))
# 合并数据
X_iso = np.vstack([normal_data, outliers])
y_true_iso = np.hstack([np.zeros(len(normal_data)), np.ones(len(outliers))])
print(f"数据集大小: {len(X_iso)}")
print(f"正常样本: {len(normal_data)}")
print(f"异常样本: {len(outliers)}")
# 孤立森林
iso_forest = IsolationForest(
contamination=0.1, # 预期异常比例
random_state=42,
n_estimators=100
)
y_pred_iso = iso_forest.fit_predict(X_iso)
y_pred_iso_binary = (y_pred_iso == -1).astype(int)
# 获取异常分数
anomaly_scores_iso = iso_forest.decision_function(X_iso)
# 性能评估
precision = precision_score(y_true_iso, y_pred_iso_binary)
recall = recall_score(y_true_iso, y_pred_iso_binary)
f1 = f1_score(y_true_iso, y_pred_iso_binary)
print(f"\n孤立森林性能:")
print(f" 精确率: {precision:.3f}")
print(f" 召回率: {recall:.3f}")
print(f" F1得分: {f1:.3f}")
# 可视化
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('孤立森林异常检测', fontsize=16)
# 原始数据
axes[0, 0].scatter(normal_data[:, 0], normal_data[:, 1],
c='blue', alpha=0.6, label='正常点', s=30)
axes[0, 0].scatter(outliers[:, 0], outliers[:, 1],
c='red', alpha=0.8, label='真实异常点', s=100, marker='x')
axes[0, 0].set_title('原始数据')
axes[0, 0].set_xlabel('特征 1')
axes[0, 0].set_ylabel('特征 2')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 孤立森林预测结果
normal_mask = y_pred_iso_binary == 0
anomaly_mask = y_pred_iso_binary == 1
axes[0, 1].scatter(X_iso[normal_mask, 0], X_iso[normal_mask, 1],
c='blue', alpha=0.6, label='预测正常', s=30)
axes[0, 1].scatter(X_iso[anomaly_mask, 0], X_iso[anomaly_mask, 1],
c='red', alpha=0.8, label='预测异常', s=60, marker='s')
axes[0, 1].set_title(f'孤立森林预测 (F1={f1:.3f})')
axes[0, 1].set_xlabel('特征 1')
axes[0, 1].set_ylabel('特征 2')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# 异常分数热力图
xx, yy = np.meshgrid(np.linspace(-6, 6, 50), np.linspace(-6, 6, 50))
grid_points = np.c_[xx.ravel(), yy.ravel()]
Z = iso_forest.decision_function(grid_points)
Z = Z.reshape(xx.shape)
contour = axes[1, 0].contourf(xx, yy, Z, levels=20, cmap='RdYlBu', alpha=0.7)
axes[1, 0].scatter(X_iso[:, 0], X_iso[:, 1], c=y_true_iso,
cmap='RdYlBu', edgecolors='black', s=50)
axes[1, 0].set_title('异常分数热力图')
axes[1, 0].set_xlabel('特征 1')
axes[1, 0].set_ylabel('特征 2')
plt.colorbar(contour, ax=axes[1, 0], label='异常分数')
# 异常分数分布
axes[1, 1].hist(anomaly_scores_iso[y_true_iso == 0], bins=20, alpha=0.6,
label='正常样本', color='blue', density=True)
axes[1, 1].hist(anomaly_scores_iso[y_true_iso == 1], bins=20, alpha=0.6,
label='异常样本', color='red', density=True)
axes[1, 1].set_title('异常分数分布')
axes[1, 1].set_xlabel('异常分数')
axes[1, 1].set_ylabel('密度')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return X_iso, y_true_iso, iso_forest
X_iso, y_true_iso, iso_forest = isolation_forest_demo()13.4.2 局部异常因子 (LOF)
python
def local_outlier_factor_demo():
"""演示局部异常因子异常检测"""
print("局部异常因子 (LOF) 异常检测:")
print("基于局部密度的异常检测,适合检测局部异常")
# 创建具有不同密度区域的数据
np.random.seed(42)
# 高密度区域
dense_cluster = np.random.multivariate_normal([0, 0], [[0.2, 0], [0, 0.2]], 80)
# 低密度区域
sparse_cluster = np.random.multivariate_normal([4, 4], [[1, 0], [0, 1]], 40)
# 正常数据
normal_data = np.vstack([dense_cluster, sparse_cluster])
# 异常点:在密集区域中的稀疏点,在稀疏区域中的密集点
outliers = np.array([[0, 3], [3, 0], [-3, 0], [0, -3], [7, 7], [1, 7]])
# 合并数据
X_lof = np.vstack([normal_data, outliers])
y_true_lof = np.hstack([np.zeros(len(normal_data)), np.ones(len(outliers))])
print(f"数据集大小: {len(X_lof)}")
print(f"正常样本: {len(normal_data)}")
print(f"异常样本: {len(outliers)}")
# LOF异常检测
lof = LocalOutlierFactor(
n_neighbors=20,
contamination=0.1,
novelty=False # 用于训练数据的异常检测
)
y_pred_lof = lof.fit_predict(X_lof)
y_pred_lof_binary = (y_pred_lof == -1).astype(int)
# 获取LOF分数
lof_scores = -lof.negative_outlier_factor_ # 转换为正值,值越大越异常
# 性能评估
precision = precision_score(y_true_lof, y_pred_lof_binary)
recall = recall_score(y_true_lof, y_pred_lof_binary)
f1 = f1_score(y_true_lof, y_pred_lof_binary)
print(f"\nLOF性能:")
print(f" 精确率: {precision:.3f}")
print(f" 召回率: {recall:.3f}")
print(f" F1得分: {f1:.3f}")
# 可视化
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('局部异常因子 (LOF) 异常检测', fontsize=16)
# 原始数据
axes[0, 0].scatter(dense_cluster[:, 0], dense_cluster[:, 1],
c='lightblue', alpha=0.6, label='高密度区域', s=30)
axes[0, 0].scatter(sparse_cluster[:, 0], sparse_cluster[:, 1],
c='lightgreen', alpha=0.6, label='低密度区域', s=30)
axes[0, 0].scatter(outliers[:, 0], outliers[:, 1],
c='red', alpha=0.8, label='真实异常点', s=100, marker='x')
axes[0, 0].set_title('原始数据(不同密度区域)')
axes[0, 0].set_xlabel('特征 1')
axes[0, 0].set_ylabel('特征 2')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# LOF预测结果
normal_mask = y_pred_lof_binary == 0
anomaly_mask = y_pred_lof_binary == 1
axes[0, 1].scatter(X_lof[normal_mask, 0], X_lof[normal_mask, 1],
c='blue', alpha=0.6, label='预测正常', s=30)
axes[0, 1].scatter(X_lof[anomaly_mask, 0], X_lof[anomaly_mask, 1],
c='red', alpha=0.8, label='预测异常', s=60, marker='s')
axes[0, 1].set_title(f'LOF预测结果 (F1={f1:.3f})')
axes[0, 1].set_xlabel('特征 1')
axes[0, 1].set_ylabel('特征 2')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# LOF分数可视化
scatter = axes[1, 0].scatter(X_lof[:, 0], X_lof[:, 1], c=lof_scores,
cmap='Reds', s=50, alpha=0.7)
axes[1, 0].set_title('LOF分数可视化')
axes[1, 0].set_xlabel('特征 1')
axes[1, 0].set_ylabel('特征 2')
plt.colorbar(scatter, ax=axes[1, 0], label='LOF分数')
# LOF分数分布
axes[1, 1].hist(lof_scores[y_true_lof == 0], bins=20, alpha=0.6,
label='正常样本', color='blue', density=True)
axes[1, 1].hist(lof_scores[y_true_lof == 1], bins=20, alpha=0.6,
label='异常样本', color='red', density=True)
axes[1, 1].set_title('LOF分数分布')
axes[1, 1].set_xlabel('LOF分数')
axes[1, 1].set_ylabel('密度')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return X_lof, y_true_lof, lof
X_lof, y_true_lof, lof = local_outlier_factor_demo()13.4.3 One-Class SVM
python
def one_class_svm_demo():
"""演示One-Class SVM异常检测"""
print("One-Class SVM异常检测:")
print("学习正常数据的边界,将边界外的点视为异常")
# 使用之前的数据
# One-Class SVM
oc_svm = OneClassSVM(
kernel='rbf',
gamma='scale',
nu=0.1 # 预期异常比例的上界
)
y_pred_svm = oc_svm.fit_predict(X_iso)
y_pred_svm_binary = (y_pred_svm == -1).astype(int)
# 获取决策分数
decision_scores = oc_svm.decision_function(X_iso)
# 性能评估
precision = precision_score(y_true_iso, y_pred_svm_binary)
recall = recall_score(y_true_iso, y_pred_svm_binary)
f1 = f1_score(y_true_iso, y_pred_svm_binary)
print(f"\nOne-Class SVM性能:")
print(f" 精确率: {precision:.3f}")
print(f" 召回率: {recall:.3f}")
print(f" F1得分: {f1:.3f}")
# 可视化
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
# 预测结果
normal_mask = y_pred_svm_binary == 0
anomaly_mask = y_pred_svm_binary == 1
axes[0].scatter(X_iso[normal_mask, 0], X_iso[normal_mask, 1],
c='blue', alpha=0.6, label='预测正常', s=30)
axes[0].scatter(X_iso[anomaly_mask, 0], X_iso[anomaly_mask, 1],
c='red', alpha=0.8, label='预测异常', s=60, marker='s')
# 绘制决策边界
xx, yy = np.meshgrid(np.linspace(-6, 6, 100), np.linspace(-6, 6, 100))
grid_points = np.c_[xx.ravel(), yy.ravel()]
Z = oc_svm.decision_function(grid_points)
Z = Z.reshape(xx.shape)
axes[0].contour(xx, yy, Z, levels=[0], colors='black', linestyles='--', linewidths=2)
axes[0].set_title(f'One-Class SVM (F1={f1:.3f})')
axes[0].set_xlabel('特征 1')
axes[0].set_ylabel('特征 2')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 决策分数热力图
contour = axes[1].contourf(xx, yy, Z, levels=20, cmap='RdYlBu', alpha=0.7)
axes[1].scatter(X_iso[:, 0], X_iso[:, 1], c=y_true_iso,
cmap='RdYlBu', edgecolors='black', s=50)
axes[1].contour(xx, yy, Z, levels=[0], colors='black', linestyles='-', linewidths=2)
axes[1].set_title('决策边界和分数')
axes[1].set_xlabel('特征 1')
axes[1].set_ylabel('特征 2')
plt.colorbar(contour, ax=axes[1], label='决策分数')
# 决策分数分布
axes[2].hist(decision_scores[y_true_iso == 0], bins=20, alpha=0.6,
label='正常样本', color='blue', density=True)
axes[2].hist(decision_scores[y_true_iso == 1], bins=20, alpha=0.6,
label='异常样本', color='red', density=True)
axes[2].axvline(x=0, color='black', linestyle='--', alpha=0.7, label='决策边界')
axes[2].set_title('决策分数分布')
axes[2].set_xlabel('决策分数')
axes[2].set_ylabel('密度')
axes[2].legend()
axes[2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return oc_svm, decision_scores
oc_svm, decision_scores = one_class_svm_demo()
```## 13.
5 算法比较和选择
### 13.5.1 综合性能比较
```python
def comprehensive_anomaly_detection_comparison():
"""综合比较不同异常检测算法"""
print("异常检测算法综合比较:")
print("=" * 30)
# 创建多种类型的数据集
datasets = []
# 1. 球形聚类数据
np.random.seed(42)
normal_spherical = np.random.multivariate_normal([0, 0], [[1, 0], [0, 1]], 150)
outliers_spherical = np.random.uniform(-4, 4, (15, 2))
X_spherical = np.vstack([normal_spherical, outliers_spherical])
y_spherical = np.hstack([np.zeros(150), np.ones(15)])
datasets.append(('球形聚类', X_spherical, y_spherical))
# 2. 椭圆聚类数据
normal_elliptical = np.random.multivariate_normal([0, 0], [[2, 1.5], [1.5, 2]], 150)
outliers_elliptical = np.array([[5, 5], [-5, -5], [5, -5], [-5, 5], [0, 6], [6, 0]])
outliers_elliptical = np.vstack([outliers_elliptical, np.random.uniform(-6, 6, (9, 2))])
X_elliptical = np.vstack([normal_elliptical, outliers_elliptical])
y_elliptical = np.hstack([np.zeros(150), np.ones(15)])
datasets.append(('椭圆聚类', X_elliptical, y_elliptical))
# 3. 多聚类数据
cluster1 = np.random.multivariate_normal([2, 2], [[0.5, 0], [0, 0.5]], 50)
cluster2 = np.random.multivariate_normal([-2, -2], [[0.5, 0], [0, 0.5]], 50)
cluster3 = np.random.multivariate_normal([2, -2], [[0.5, 0], [0, 0.5]], 50)
normal_multi = np.vstack([cluster1, cluster2, cluster3])
outliers_multi = np.random.uniform(-5, 5, (15, 2))
X_multi = np.vstack([normal_multi, outliers_multi])
y_multi = np.hstack([np.zeros(150), np.ones(15)])
datasets.append(('多聚类', X_multi, y_multi))
# 异常检测算法
algorithms = {
'孤立森林': IsolationForest(contamination=0.1, random_state=42),
'LOF': LocalOutlierFactor(contamination=0.1, n_neighbors=20),
'One-Class SVM': OneClassSVM(nu=0.1, kernel='rbf', gamma='scale'),
'椭圆包络': EllipticEnvelope(contamination=0.1, random_state=42)
}
# 存储结果
results = {}
print("数据集\t\t算法\t\t精确率\t召回率\tF1得分")
print("-" * 60)
fig, axes = plt.subplots(len(datasets), len(algorithms) + 1, figsize=(20, 12))
fig.suptitle('异常检测算法比较', fontsize=16)
for i, (dataset_name, X, y_true) in enumerate(datasets):
results[dataset_name] = {}
# 标准化数据
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 绘制原始数据
normal_mask = y_true == 0
anomaly_mask = y_true == 1
axes[i, 0].scatter(X[normal_mask, 0], X[normal_mask, 1],
c='blue', alpha=0.6, label='正常', s=20)
axes[i, 0].scatter(X[anomaly_mask, 0], X[anomaly_mask, 1],
c='red', alpha=0.8, label='异常', s=60, marker='x')
axes[i, 0].set_title(f'{dataset_name}数据')
axes[i, 0].legend()
axes[i, 0].grid(True, alpha=0.3)
for j, (alg_name, algorithm) in enumerate(algorithms.items()):
try:
# 训练和预测
if alg_name == 'LOF':
y_pred = algorithm.fit_predict(X_scaled)
else:
y_pred = algorithm.fit_predict(X_scaled)
# 转换预测结果
y_pred_binary = (y_pred == -1).astype(int)
# 计算性能指标
precision = precision_score(y_true, y_pred_binary, zero_division=0)
recall = recall_score(y_true, y_pred_binary, zero_division=0)
f1 = f1_score(y_true, y_pred_binary, zero_division=0)
results[dataset_name][alg_name] = {
'precision': precision,
'recall': recall,
'f1': f1
}
print(f"{dataset_name}\t{alg_name}\t{precision:.3f}\t\t{recall:.3f}\t{f1:.3f}")
# 可视化结果
pred_normal_mask = y_pred_binary == 0
pred_anomaly_mask = y_pred_binary == 1
axes[i, j + 1].scatter(X[pred_normal_mask, 0], X[pred_normal_mask, 1],
c='blue', alpha=0.6, s=20)
axes[i, j + 1].scatter(X[pred_anomaly_mask, 0], X[pred_anomaly_mask, 1],
c='red', alpha=0.8, s=40, marker='s')
axes[i, j + 1].set_title(f'{alg_name}\nF1={f1:.3f}')
axes[i, j + 1].grid(True, alpha=0.3)
except Exception as e:
print(f"{dataset_name}\t{alg_name}\t错误: {str(e)[:20]}")
axes[i, j + 1].text(0.5, 0.5, '算法失败', ha='center', va='center',
transform=axes[i, j + 1].transAxes)
axes[i, j + 1].set_title(f'{alg_name}\n失败')
plt.tight_layout()
plt.show()
return results
comparison_results = comprehensive_anomaly_detection_comparison()13.5.2 算法选择指南
python
def anomaly_detection_selection_guide():
"""异常检测算法选择指南"""
print("异常检测算法选择指南:")
print("=" * 30)
algorithm_guide = {
'孤立森林': {
'适用场景': ['大数据集', '高维数据', '全局异常检测'],
'优点': ['训练快速', '内存效率高', '无需标准化', '处理高维数据好'],
'缺点': ['对正常数据密度敏感', '参数调优困难'],
'最佳使用': '当数据集较大且异常点相对孤立时',
'参数': 'contamination, n_estimators, max_samples'
},
'LOF': {
'适用场景': ['局部异常检测', '密度变化的数据', '中小数据集'],
'优点': ['检测局部异常', '适应不同密度', '无需假设数据分布'],
'缺点': ['计算复杂度高', '对参数敏感', '不适合高维数据'],
'最佳使用': '当异常点在局部区域中密度明显不同时',
'参数': 'n_neighbors, contamination'
},
'One-Class SVM': {
'适用场景': ['非线性边界', '小数据集', '需要明确边界'],
'优点': ['处理非线性边界', '理论基础强', '适合小数据集'],
'缺点': ['参数调优复杂', '计算复杂度高', '对缩放敏感'],
'最佳使用': '当正常数据有明确边界且数据集不太大时',
'参数': 'nu, kernel, gamma'
},
'椭圆包络': {
'适用场景': ['高斯分布数据', '需要概率解释', '低维数据'],
'优点': ['假设明确', '计算快速', '提供概率解释'],
'缺点': ['假设数据服从高斯分布', '对非高斯数据效果差'],
'最佳使用': '当确信正常数据服从多元高斯分布时',
'参数': 'contamination, support_fraction'
}
}
for alg_name, info in algorithm_guide.items():
print(f"\n{alg_name}:")
print("-" * 20)
for key, value in info.items():
if isinstance(value, list):
print(f"{key}: {', '.join(value)}")
else:
print(f"{key}: {value}")
# 决策树
print(f"\n算法选择决策树:")
print("-" * 20)
decision_tree = """
数据集大小?
├─ 大(>10000) → 孤立森林
└─ 小(<10000) ──┐
│
数据分布? │
├─ 高斯分布 → 椭圆包络
├─ 多聚类 → LOF
└─ 未知 ──┐
│
异常类型?│
├─ 全局异常 → 孤立森林 或 One-Class SVM
├─ 局部异常 → LOF
└─ 边界异常 → One-Class SVM
计算资源?
├─ 有限 → 椭圆包络 或 孤立森林
└─ 充足 → LOF 或 One-Class SVM
"""
print(decision_tree)
# 性能总结表
performance_summary = pd.DataFrame({
'算法': ['孤立森林', 'LOF', 'One-Class SVM', '椭圆包络'],
'训练速度': ['快', '慢', '中', '快'],
'预测速度': ['快', '慢', '中', '快'],
'内存使用': ['低', '高', '中', '低'],
'参数敏感性': ['低', '高', '高', '低'],
'高维适应性': ['好', '差', '中', '中'],
'非线性处理': ['中', '好', '好', '差']
})
print(f"\n算法性能总结:")
print(performance_summary.to_string(index=False))
return algorithm_guide
algorithm_guide = anomaly_detection_selection_guide()13.6 实际应用案例
13.6.1 信用卡欺诈检测
python
def credit_card_fraud_detection():
"""信用卡欺诈检测案例"""
print("信用卡欺诈检测案例:")
print("=" * 25)
# 创建模拟信用卡交易数据
np.random.seed(42)
n_normal = 1000
n_fraud = 50
# 正常交易特征
# 交易金额:对数正态分布
normal_amounts = np.random.lognormal(mean=3, sigma=1, size=n_normal)
# 交易时间:正常工作时间更多
normal_hours = np.random.choice(range(24), size=n_normal,
p=[0.02]*6 + [0.08]*12 + [0.04]*6) # 6-18点概率更高
# 商户类型:正常分布
normal_merchant_types = np.random.normal(5, 2, n_normal)
# 地理位置:集中在某些区域
normal_locations = np.random.multivariate_normal([0, 0], [[1, 0.3], [0.3, 1]], n_normal)
# 欺诈交易特征
# 交易金额:通常很大或很小
fraud_amounts = np.concatenate([
np.random.lognormal(mean=6, sigma=0.5, size=n_fraud//2), # 大额
np.random.lognormal(mean=1, sigma=0.5, size=n_fraud//2) # 小额测试
])
# 交易时间:异常时间更多
fraud_hours = np.random.choice(range(24), size=n_fraud,
p=[0.08]*6 + [0.02]*12 + [0.08]*6) # 非工作时间概率更高
# 商户类型:异常类型
fraud_merchant_types = np.random.normal(10, 3, n_fraud)
# 地理位置:异常位置
fraud_locations = np.random.multivariate_normal([5, 5], [[2, 0], [0, 2]], n_fraud)
# 合并数据
amounts = np.concatenate([normal_amounts, fraud_amounts])
hours = np.concatenate([normal_hours, fraud_hours])
merchant_types = np.concatenate([normal_merchant_types, fraud_merchant_types])
locations = np.vstack([normal_locations, fraud_locations])
# 创建特征矩阵
X_fraud = np.column_stack([
amounts,
hours,
merchant_types,
locations[:, 0], # 纬度
locations[:, 1] # 经度
])
y_fraud = np.concatenate([np.zeros(n_normal), np.ones(n_fraud)])
feature_names = ['交易金额', '交易时间', '商户类型', '纬度', '经度']
print(f"数据集信息:")
print(f" 总交易数: {len(X_fraud)}")
print(f" 正常交易: {n_normal} ({n_normal/len(X_fraud)*100:.1f}%)")
print(f" 欺诈交易: {n_fraud} ({n_fraud/len(X_fraud)*100:.1f}%)")
# 数据标准化
scaler = StandardScaler()
X_fraud_scaled = scaler.fit_transform(X_fraud)
# 应用不同的异常检测算法
algorithms = {
'孤立森林': IsolationForest(contamination=0.05, random_state=42),
'LOF': LocalOutlierFactor(contamination=0.05, n_neighbors=20),
'One-Class SVM': OneClassSVM(nu=0.05, kernel='rbf', gamma='scale')
}
results = {}
print(f"\n欺诈检测结果:")
print("算法\t\t精确率\t召回率\tF1得分\tAUC")
print("-" * 50)
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
fig.suptitle('信用卡欺诈检测', fontsize=16)
for i, (alg_name, algorithm) in enumerate(algorithms.items()):
# 训练和预测
y_pred = algorithm.fit_predict(X_fraud_scaled)
y_pred_binary = (y_pred == -1).astype(int)
# 获取异常分数
if hasattr(algorithm, 'decision_function'):
scores = algorithm.decision_function(X_fraud_scaled)
elif hasattr(algorithm, 'negative_outlier_factor_'):
scores = -algorithm.negative_outlier_factor_
else:
scores = np.random.random(len(X_fraud_scaled)) # 占位符
# 计算性能指标
precision = precision_score(y_fraud, y_pred_binary)
recall = recall_score(y_fraud, y_pred_binary)
f1 = f1_score(y_fraud, y_pred_binary)
# 计算AUC(需要转换分数)
if alg_name == 'LOF':
auc = roc_auc_score(y_fraud, scores)
else:
auc = roc_auc_score(y_fraud, -scores) # 分数越低越异常
results[alg_name] = {
'precision': precision,
'recall': recall,
'f1': f1,
'auc': auc,
'predictions': y_pred_binary
}
print(f"{alg_name}\t{precision:.3f}\t\t{recall:.3f}\t{f1:.3f}\t{auc:.3f}")
# 可视化:交易金额 vs 交易时间
row = i // 3
col = i % 3
normal_mask = y_pred_binary == 0
fraud_mask = y_pred_binary == 1
axes[row, col].scatter(X_fraud[normal_mask, 0], X_fraud[normal_mask, 1],
c='blue', alpha=0.6, label='预测正常', s=20)
axes[row, col].scatter(X_fraud[fraud_mask, 0], X_fraud[fraud_mask, 1],
c='red', alpha=0.8, label='预测欺诈', s=40, marker='s')
axes[row, col].set_title(f'{alg_name} (F1={f1:.3f})')
axes[row, col].set_xlabel('交易金额')
axes[row, col].set_ylabel('交易时间')
axes[row, col].legend()
axes[row, col].grid(True, alpha=0.3)
# 特征分布比较
axes[1, 0].hist(X_fraud[y_fraud == 0, 0], bins=30, alpha=0.6,
label='正常交易', color='blue', density=True)
axes[1, 0].hist(X_fraud[y_fraud == 1, 0], bins=30, alpha=0.6,
label='欺诈交易', color='red', density=True)
axes[1, 0].set_title('交易金额分布')
axes[1, 0].set_xlabel('交易金额')
axes[1, 0].set_ylabel('密度')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
# 地理位置分布
axes[1, 1].scatter(X_fraud[y_fraud == 0, 3], X_fraud[y_fraud == 0, 4],
c='blue', alpha=0.6, label='正常交易', s=20)
axes[1, 1].scatter(X_fraud[y_fraud == 1, 3], X_fraud[y_fraud == 1, 4],
c='red', alpha=0.8, label='欺诈交易', s=60, marker='x')
axes[1, 1].set_title('地理位置分布')
axes[1, 1].set_xlabel('纬度')
axes[1, 1].set_ylabel('经度')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
# 性能比较
algorithms_list = list(results.keys())
f1_scores = [results[alg]['f1'] for alg in algorithms_list]
auc_scores = [results[alg]['auc'] for alg in algorithms_list]
x_pos = np.arange(len(algorithms_list))
width = 0.35
axes[1, 2].bar(x_pos - width/2, f1_scores, width, label='F1得分', alpha=0.7)
axes[1, 2].bar(x_pos + width/2, auc_scores, width, label='AUC得分', alpha=0.7)
axes[1, 2].set_title('性能比较')
axes[1, 2].set_xlabel('算法')
axes[1, 2].set_ylabel('得分')
axes[1, 2].set_xticks(x_pos)
axes[1, 2].set_xticklabels(algorithms_list, rotation=45)
axes[1, 2].legend()
axes[1, 2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 业务影响分析
print(f"\n业务影响分析:")
best_algorithm = max(results.keys(), key=lambda x: results[x]['f1'])
best_results = results[best_algorithm]
print(f"最佳算法: {best_algorithm}")
print(f" 检测到的欺诈交易: {np.sum(best_results['predictions'])}")
print(f" 实际欺诈交易: {np.sum(y_fraud)}")
print(f" 漏检的欺诈交易: {np.sum(y_fraud) - np.sum((y_fraud == 1) & (best_results['predictions'] == 1))}")
print(f" 误报的正常交易: {np.sum((y_fraud == 0) & (best_results['predictions'] == 1))}")
return X_fraud, y_fraud, results
X_fraud, y_fraud, fraud_results = credit_card_fraud_detection()
```###
13.6.2 网络入侵检测
```python
def network_intrusion_detection():
"""网络入侵检测案例"""
print("网络入侵检测案例:")
print("=" * 20)
# 创建模拟网络流量数据
np.random.seed(42)
n_normal = 800
n_intrusion = 40
# 正常网络流量特征
normal_packet_size = np.random.lognormal(mean=6, sigma=1, size=n_normal)
normal_duration = np.random.exponential(scale=10, size=n_normal)
normal_src_bytes = np.random.lognormal(mean=8, sigma=1.5, size=n_normal)
normal_dst_bytes = np.random.lognormal(mean=7, sigma=1.2, size=n_normal)
normal_protocol = np.random.choice([0, 1, 2], size=n_normal, p=[0.6, 0.3, 0.1])
# 入侵流量特征(异常模式)
intrusion_packet_size = np.random.lognormal(mean=4, sigma=2, size=n_intrusion)
intrusion_duration = np.random.exponential(scale=50, size=n_intrusion) # 更长的连接
intrusion_src_bytes = np.random.lognormal(mean=10, sigma=2, size=n_intrusion) # 更多数据
intrusion_dst_bytes = np.random.lognormal(mean=5, sigma=2, size=n_intrusion) # 不对称
intrusion_protocol = np.random.choice([0, 1, 2], size=n_intrusion, p=[0.2, 0.2, 0.6]) # 异常协议
# 合并数据
X_network = np.column_stack([
np.concatenate([normal_packet_size, intrusion_packet_size]),
np.concatenate([normal_duration, intrusion_duration]),
np.concatenate([normal_src_bytes, intrusion_src_bytes]),
np.concatenate([normal_dst_bytes, intrusion_dst_bytes]),
np.concatenate([normal_protocol, intrusion_protocol])
])
y_network = np.concatenate([np.zeros(n_normal), np.ones(n_intrusion)])
feature_names = ['包大小', '连接时长', '源字节数', '目标字节数', '协议类型']
print(f"网络流量数据:")
print(f" 总连接数: {len(X_network)}")
print(f" 正常连接: {n_normal} ({n_normal/len(X_network)*100:.1f}%)")
print(f" 入侵连接: {n_intrusion} ({n_intrusion/len(X_network)*100:.1f}%)")
# 数据标准化
scaler = StandardScaler()
X_network_scaled = scaler.fit_transform(X_network)
# 使用孤立森林进行入侵检测
iso_forest_network = IsolationForest(
contamination=0.05,
random_state=42,
n_estimators=200
)
y_pred_network = iso_forest_network.fit_predict(X_network_scaled)
y_pred_network_binary = (y_pred_network == -1).astype(int)
# 性能评估
precision = precision_score(y_network, y_pred_network_binary)
recall = recall_score(y_network, y_pred_network_binary)
f1 = f1_score(y_network, y_pred_network_binary)
print(f"\n入侵检测结果:")
print(f" 精确率: {precision:.3f}")
print(f" 召回率: {recall:.3f}")
print(f" F1得分: {f1:.3f}")
# 可视化
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('网络入侵检测', fontsize=16)
# 特征分布比较
axes[0, 0].hist(X_network[y_network == 0, 1], bins=30, alpha=0.6,
label='正常连接', color='blue', density=True)
axes[0, 0].hist(X_network[y_network == 1, 1], bins=30, alpha=0.6,
label='入侵连接', color='red', density=True)
axes[0, 0].set_title('连接时长分布')
axes[0, 0].set_xlabel('连接时长')
axes[0, 0].set_ylabel('密度')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 数据传输模式
axes[0, 1].scatter(X_network[y_network == 0, 2], X_network[y_network == 0, 3],
c='blue', alpha=0.6, label='正常连接', s=20)
axes[0, 1].scatter(X_network[y_network == 1, 2], X_network[y_network == 1, 3],
c='red', alpha=0.8, label='入侵连接', s=60, marker='x')
axes[0, 1].set_title('数据传输模式')
axes[0, 1].set_xlabel('源字节数')
axes[0, 1].set_ylabel('目标字节数')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# 检测结果
normal_mask = y_pred_network_binary == 0
intrusion_mask = y_pred_network_binary == 1
axes[1, 0].scatter(X_network[normal_mask, 1], X_network[normal_mask, 2],
c='blue', alpha=0.6, label='预测正常', s=20)
axes[1, 0].scatter(X_network[intrusion_mask, 1], X_network[intrusion_mask, 2],
c='red', alpha=0.8, label='预测入侵', s=40, marker='s')
axes[1, 0].set_title(f'入侵检测结果 (F1={f1:.3f})')
axes[1, 0].set_xlabel('连接时长')
axes[1, 0].set_ylabel('源字节数')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
# 混淆矩阵
cm = confusion_matrix(y_network, y_pred_network_binary)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=axes[1, 1])
axes[1, 1].set_title('混淆矩阵')
axes[1, 1].set_xlabel('预测标签')
axes[1, 1].set_ylabel('真实标签')
plt.tight_layout()
plt.show()
return X_network, y_network, iso_forest_network
X_network, y_network, iso_forest_network = network_intrusion_detection()13.7 练习题
练习1:基础异常检测
- 创建一个包含异常点的二维数据集
- 使用Z-score、IQR和马氏距离方法检测异常
- 比较三种方法的检测效果
练习2:算法比较
- 使用make_blobs创建多聚类数据并添加异常点
- 应用孤立森林、LOF、One-Class SVM和椭圆包络
- 分析每种算法的优缺点
练习3:参数调优
- 使用孤立森林进行异常检测
- 调优contamination和n_estimators参数
- 分析参数对检测性能的影响
练习4:高维数据
- 创建一个高维数据集(特征数>20)
- 比较不同算法在高维数据上的表现
- 分析维度诅咒对异常检测的影响
练习5:实际应用
- 模拟一个质量控制场景的数据
- 构建异常检测系统识别缺陷产品
- 评估系统的业务价值和成本效益
13.8 小结
在本章中,我们深入学习了异常检测的各个方面:
核心概念
- 异常类型:点异常、上下文异常、集体异常
- 检测方法:统计方法、机器学习方法
- 评估指标:精确率、召回率、F1得分、AUC
主要算法
- 统计方法:Z-score、IQR、马氏距离、椭圆包络
- 孤立森林:基于随机分割的异常检测
- LOF:基于局部密度的异常检测
- One-Class SVM:基于边界的异常检测
实践技能
- 数据预处理:标准化、特征工程
- 算法选择:根据数据特点选择合适算法
- 参数调优:contamination等关键参数
- 性能评估:多种指标综合评估
关键要点
- 异常检测是无监督学习问题
- 不同算法适用于不同类型的异常
- 业务理解对异常检测至关重要
- 需要平衡检测率和误报率
算法选择建议
使用孤立森林当:
- 数据集较大
- 需要快速检测
- 异常点相对孤立
- 处理高维数据
使用LOF当:
- 需要检测局部异常
- 数据密度变化较大
- 数据集中等规模
- 对检测精度要求高
使用One-Class SVM当:
- 需要非线性边界
- 数据集较小
- 需要明确的决策边界
- 对理论基础有要求
使用椭圆包络当:
- 数据服从高斯分布
- 需要快速检测
- 需要概率解释
- 数据维度不高
实际应用考虑
业务理解
- 明确异常的定义
- 了解异常的业务影响
- 确定可接受的误报率
数据质量
- 处理缺失值和噪声
- 进行适当的特征工程
- 考虑数据的时间性
模型部署
- 考虑实时性要求
- 建立监控和更新机制
- 处理概念漂移问题
13.9 下一步
现在你已经掌握了异常检测这个重要的无监督学习技术!在下一章交叉验证中,我们将学习如何正确评估机器学习模型的性能,避免过拟合和数据泄露等问题。
章节要点回顾:
- ✅ 理解了异常检测的基本概念和应用场景
- ✅ 掌握了统计方法和机器学习方法
- ✅ 学会了不同算法的原理和实现
- ✅ 了解了算法选择的标准和方法
- ✅ 掌握了异常检测的评估指标
- ✅ 能够在实际业务场景中应用异常检测技术