时间序列预测
时间序列预测是机器学习中的重要应用领域,涉及根据历史数据预测未来值。本章将展示如何使用TensorFlow构建各种时间序列预测模型。
项目概述
我们将构建多种时间序列预测模型,包括传统统计方法和深度学习方法,应用于股票价格、天气数据、销售预测等场景。
项目目标
- 理解时间序列数据特征
- 掌握数据预处理和特征工程
- 构建多种预测模型
- 学习模型评估和优化
- 实现实时预测系统
python
import tensorflow as tf
from tensorflow import keras
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import warnings
warnings.filterwarnings('ignore')
# 设置随机种子
tf.random.set_seed(42)
np.random.seed(42)
print(f"TensorFlow版本: {tf.__version__}")数据准备
生成示例时间序列数据
python
def generate_synthetic_timeseries(n_samples=1000, noise_level=0.1):
"""
生成合成时间序列数据
"""
# 时间索引
time = np.arange(n_samples)
# 趋势分量
trend = 0.02 * time
# 季节性分量
seasonal = 10 * np.sin(2 * np.pi * time / 365.25) + 5 * np.sin(2 * np.pi * time / 30.44)
# 周期性分量
cyclical = 3 * np.sin(2 * np.pi * time / 7)
# 噪声
noise = np.random.normal(0, noise_level * 10, n_samples)
# 组合所有分量
series = 100 + trend + seasonal + cyclical + noise
# 创建DataFrame
dates = pd.date_range(start='2020-01-01', periods=n_samples, freq='D')
df = pd.DataFrame({
'date': dates,
'value': series,
'trend': trend,
'seasonal': seasonal,
'cyclical': cyclical,
'noise': noise
})
return df
def load_stock_data(symbol='AAPL', start_date='2020-01-01', end_date='2023-12-31'):
"""
加载股票数据(示例函数,实际需要API)
"""
# 这里使用模拟数据,实际应用中可以使用yfinance等库
n_days = (pd.to_datetime(end_date) - pd.to_datetime(start_date)).days
dates = pd.date_range(start=start_date, end=end_date, freq='D')
# 模拟股票价格
np.random.seed(42)
returns = np.random.normal(0.001, 0.02, len(dates))
prices = [100]
for ret in returns[1:]:
prices.append(prices[-1] * (1 + ret))
df = pd.DataFrame({
'date': dates,
'close': prices[:len(dates)],
'volume': np.random.randint(1000000, 10000000, len(dates))
})
return df
# 生成示例数据
ts_data = generate_synthetic_timeseries(1000)
stock_data = load_stock_data()
print("时间序列数据形状:", ts_data.shape)
print("股票数据形状:", stock_data.shape)数据可视化
python
def plot_timeseries_components(df, value_col='value'):
"""
可视化时间序列组件
"""
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 原始时间序列
axes[0, 0].plot(df['date'], df[value_col])
axes[0, 0].set_title('原始时间序列')
axes[0, 0].set_xlabel('日期')
axes[0, 0].set_ylabel('值')
# 趋势分量
if 'trend' in df.columns:
axes[0, 1].plot(df['date'], df['trend'])
axes[0, 1].set_title('趋势分量')
axes[0, 1].set_xlabel('日期')
axes[0, 1].set_ylabel('趋势')
# 季节性分量
if 'seasonal' in df.columns:
axes[1, 0].plot(df['date'], df['seasonal'])
axes[1, 0].set_title('季节性分量')
axes[1, 0].set_xlabel('日期')
axes[1, 0].set_ylabel('季节性')
# 噪声分量
if 'noise' in df.columns:
axes[1, 1].plot(df['date'], df['noise'])
axes[1, 1].set_title('噪声分量')
axes[1, 1].set_xlabel('日期')
axes[1, 1].set_ylabel('噪声')
plt.tight_layout()
plt.show()
def plot_timeseries_statistics(df, value_col='value', window=30):
"""
绘制时间序列统计特征
"""
# 计算移动平均和标准差
df['rolling_mean'] = df[value_col].rolling(window=window).mean()
df['rolling_std'] = df[value_col].rolling(window=window).std()
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 原始数据和移动平均
axes[0, 0].plot(df['date'], df[value_col], label='原始数据', alpha=0.7)
axes[0, 0].plot(df['date'], df['rolling_mean'], label=f'{window}日移动平均', color='red')
axes[0, 0].set_title('时间序列和移动平均')
axes[0, 0].legend()
# 移动标准差
axes[0, 1].plot(df['date'], df['rolling_std'])
axes[0, 1].set_title(f'{window}日移动标准差')
# 自相关图
from statsmodels.tsa.stattools import acf
lags = 40
autocorr = acf(df[value_col].dropna(), nlags=lags)
axes[1, 0].bar(range(lags+1), autocorr)
axes[1, 0].set_title('自相关函数')
axes[1, 0].set_xlabel('滞后期')
# 分布直方图
axes[1, 1].hist(df[value_col].dropna(), bins=50, alpha=0.7)
axes[1, 1].set_title('数值分布')
axes[1, 1].set_xlabel('值')
axes[1, 1].set_ylabel('频次')
plt.tight_layout()
plt.show()
# 可视化数据
plot_timeseries_components(ts_data)
plot_timeseries_statistics(ts_data)数据预处理
python
def create_sequences(data, sequence_length, prediction_length=1):
"""
创建用于训练的序列数据
"""
X, y = [], []
for i in range(len(data) - sequence_length - prediction_length + 1):
# 输入序列
X.append(data[i:(i + sequence_length)])
# 目标值
y.append(data[i + sequence_length:i + sequence_length + prediction_length])
return np.array(X), np.array(y)
def prepare_timeseries_data(df, value_col='value', sequence_length=60,
prediction_length=1, test_size=0.2, scale=True):
"""
准备时间序列数据用于训练
"""
# 提取数值
values = df[value_col].values.reshape(-1, 1)
# 数据缩放
scaler = None
if scale:
scaler = MinMaxScaler()
values = scaler.fit_transform(values)
# 创建序列
X, y = create_sequences(values, sequence_length, prediction_length)
# 分割训练和测试集
split_idx = int(len(X) * (1 - test_size))
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
print(f"训练集形状: X_train={X_train.shape}, y_train={y_train.shape}")
print(f"测试集形状: X_test={X_test.shape}, y_test={y_test.shape}")
return (X_train, y_train), (X_test, y_test), scaler
def add_features(df, value_col='value'):
"""
添加时间序列特征
"""
df = df.copy()
# 滞后特征
for lag in [1, 2, 3, 7, 14, 30]:
df[f'lag_{lag}'] = df[value_col].shift(lag)
# 移动平均特征
for window in [3, 7, 14, 30]:
df[f'ma_{window}'] = df[value_col].rolling(window=window).mean()
df[f'std_{window}'] = df[value_col].rolling(window=window).std()
# 时间特征
df['day_of_week'] = df['date'].dt.dayofweek
df['day_of_month'] = df['date'].dt.day
df['month'] = df['date'].dt.month
df['quarter'] = df['date'].dt.quarter
df['year'] = df['date'].dt.year
# 差分特征
df['diff_1'] = df[value_col].diff(1)
df['diff_7'] = df[value_col].diff(7)
# 百分比变化
df['pct_change_1'] = df[value_col].pct_change(1)
df['pct_change_7'] = df[value_col].pct_change(7)
return df
# 准备数据
sequence_length = 60
prediction_length = 1
(X_train, y_train), (X_test, y_test), scaler = prepare_timeseries_data(
ts_data, sequence_length=sequence_length, prediction_length=prediction_length
)
# 添加特征
enhanced_data = add_features(ts_data)
print("增强后的特征:", enhanced_data.columns.tolist())模型构建
LSTM模型
python
def create_lstm_model(sequence_length, n_features=1, lstm_units=50,
dense_units=25, dropout_rate=0.2, prediction_length=1):
"""
创建LSTM时间序列预测模型
"""
model = keras.Sequential([
keras.layers.LSTM(lstm_units, return_sequences=True,
input_shape=(sequence_length, n_features)),
keras.layers.Dropout(dropout_rate),
keras.layers.LSTM(lstm_units, return_sequences=False),
keras.layers.Dropout(dropout_rate),
keras.layers.Dense(dense_units, activation='relu'),
keras.layers.Dropout(dropout_rate),
keras.layers.Dense(prediction_length)
])
return model
def create_bidirectional_lstm_model(sequence_length, n_features=1, lstm_units=50,
dense_units=25, dropout_rate=0.2, prediction_length=1):
"""
创建双向LSTM模型
"""
model = keras.Sequential([
keras.layers.Bidirectional(
keras.layers.LSTM(lstm_units, return_sequences=True),
input_shape=(sequence_length, n_features)
),
keras.layers.Dropout(dropout_rate),
keras.layers.Bidirectional(
keras.layers.LSTM(lstm_units, return_sequences=False)
),
keras.layers.Dropout(dropout_rate),
keras.layers.Dense(dense_units, activation='relu'),
keras.layers.Dropout(dropout_rate),
keras.layers.Dense(prediction_length)
])
return model
# 创建LSTM模型
lstm_model = create_lstm_model(sequence_length, n_features=1)
bilstm_model = create_bidirectional_lstm_model(sequence_length, n_features=1)
print("LSTM模型结构:")
lstm_model.summary()GRU模型
python
def create_gru_model(sequence_length, n_features=1, gru_units=50,
dense_units=25, dropout_rate=0.2, prediction_length=1):
"""
创建GRU时间序列预测模型
"""
model = keras.Sequential([
keras.layers.GRU(gru_units, return_sequences=True,
input_shape=(sequence_length, n_features)),
keras.layers.Dropout(dropout_rate),
keras.layers.GRU(gru_units, return_sequences=False),
keras.layers.Dropout(dropout_rate),
keras.layers.Dense(dense_units, activation='relu'),
keras.layers.Dropout(dropout_rate),
keras.layers.Dense(prediction_length)
])
return model
def create_stacked_gru_model(sequence_length, n_features=1, gru_units=[50, 50, 25],
dropout_rate=0.2, prediction_length=1):
"""
创建堆叠GRU模型
"""
model = keras.Sequential()
# 第一层GRU
model.add(keras.layers.GRU(
gru_units[0],
return_sequences=True,
input_shape=(sequence_length, n_features)
))
model.add(keras.layers.Dropout(dropout_rate))
# 中间GRU层
for units in gru_units[1:-1]:
model.add(keras.layers.GRU(units, return_sequences=True))
model.add(keras.layers.Dropout(dropout_rate))
# 最后一层GRU
model.add(keras.layers.GRU(gru_units[-1], return_sequences=False))
model.add(keras.layers.Dropout(dropout_rate))
# 输出层
model.add(keras.layers.Dense(prediction_length))
return model
# 创建GRU模型
gru_model = create_gru_model(sequence_length, n_features=1)
stacked_gru_model = create_stacked_gru_model(sequence_length, n_features=1)CNN-LSTM混合模型
python
def create_cnn_lstm_model(sequence_length, n_features=1, cnn_filters=64,
kernel_size=3, lstm_units=50, dense_units=25,
dropout_rate=0.2, prediction_length=1):
"""
创建CNN-LSTM混合模型
"""
model = keras.Sequential([
# CNN层提取局部特征
keras.layers.Conv1D(cnn_filters, kernel_size, activation='relu',
input_shape=(sequence_length, n_features)),
keras.layers.MaxPooling1D(pool_size=2),
keras.layers.Dropout(dropout_rate),
keras.layers.Conv1D(cnn_filters//2, kernel_size, activation='relu'),
keras.layers.MaxPooling1D(pool_size=2),
keras.layers.Dropout(dropout_rate),
# LSTM层捕获时序依赖
keras.layers.LSTM(lstm_units, return_sequences=False),
keras.layers.Dropout(dropout_rate),
# 全连接层
keras.layers.Dense(dense_units, activation='relu'),
keras.layers.Dropout(dropout_rate),
keras.layers.Dense(prediction_length)
])
return model
def create_attention_lstm_model(sequence_length, n_features=1, lstm_units=50,
attention_units=32, dense_units=25,
dropout_rate=0.2, prediction_length=1):
"""
创建带注意力机制的LSTM模型
"""
# 注意力层
class AttentionLayer(keras.layers.Layer):
def __init__(self, units):
super(AttentionLayer, self).__init__()
self.units = units
self.W1 = keras.layers.Dense(units)
self.W2 = keras.layers.Dense(units)
self.V = keras.layers.Dense(1)
def call(self, query, values):
# query: (batch_size, hidden_size)
# values: (batch_size, time_steps, hidden_size)
# 扩展query维度以匹配values
query_with_time_axis = tf.expand_dims(query, 1)
# 计算注意力分数
score = self.V(tf.nn.tanh(
self.W1(query_with_time_axis) + self.W2(values)
))
# 计算注意力权重
attention_weights = tf.nn.softmax(score, axis=1)
# 计算上下文向量
context_vector = attention_weights * values
context_vector = tf.reduce_sum(context_vector, axis=1)
return context_vector, attention_weights
# 构建模型
inputs = keras.layers.Input(shape=(sequence_length, n_features))
# LSTM层
lstm_out = keras.layers.LSTM(lstm_units, return_sequences=True)(inputs)
lstm_final = keras.layers.LSTM(lstm_units, return_state=True)(lstm_out)
# 注意力机制
attention = AttentionLayer(attention_units)
context_vector, attention_weights = attention(lstm_final[1], lstm_out)
# 输出层
dense = keras.layers.Dense(dense_units, activation='relu')(context_vector)
dropout = keras.layers.Dropout(dropout_rate)(dense)
outputs = keras.layers.Dense(prediction_length)(dropout)
model = keras.Model(inputs=inputs, outputs=outputs)
return model
# 创建混合模型
cnn_lstm_model = create_cnn_lstm_model(sequence_length, n_features=1)
attention_lstm_model = create_attention_lstm_model(sequence_length, n_features=1)
print("CNN-LSTM模型结构:")
cnn_lstm_model.summary()Transformer模型
python
def create_transformer_model(sequence_length, n_features=1, d_model=64,
num_heads=4, ff_dim=128, num_layers=2,
dropout_rate=0.1, prediction_length=1):
"""
创建Transformer时间序列预测模型
"""
inputs = keras.layers.Input(shape=(sequence_length, n_features))
# 投影到d_model维度
x = keras.layers.Dense(d_model)(inputs)
# 位置编码
positions = tf.range(start=0, limit=sequence_length, delta=1)
position_embedding = keras.layers.Embedding(sequence_length, d_model)(positions)
x = x + position_embedding
# Transformer编码器层
for _ in range(num_layers):
# 多头自注意力
attention_output = keras.layers.MultiHeadAttention(
num_heads=num_heads, key_dim=d_model
)(x, x)
# 残差连接和层归一化
x = keras.layers.LayerNormalization()(x + attention_output)
# 前馈网络
ffn_output = keras.layers.Dense(ff_dim, activation='relu')(x)
ffn_output = keras.layers.Dense(d_model)(ffn_output)
# 残差连接和层归一化
x = keras.layers.LayerNormalization()(x + ffn_output)
# Dropout
x = keras.layers.Dropout(dropout_rate)(x)
# 全局平均池化
x = keras.layers.GlobalAveragePooling1D()(x)
# 输出层
x = keras.layers.Dense(64, activation='relu')(x)
x = keras.layers.Dropout(dropout_rate)(x)
outputs = keras.layers.Dense(prediction_length)(x)
model = keras.Model(inputs=inputs, outputs=outputs)
return model
# 创建Transformer模型
transformer_model = create_transformer_model(sequence_length, n_features=1)
print("Transformer模型结构:")
transformer_model.summary()模型训练
训练配置
python
def compile_timeseries_model(model, learning_rate=0.001):
"""
编译时间序列模型
"""
optimizer = keras.optimizers.Adam(learning_rate=learning_rate)
model.compile(
optimizer=optimizer,
loss='mse',
metrics=['mae', 'mape']
)
return model
def create_timeseries_callbacks(model_name, patience=10):
"""
创建时间序列训练回调
"""
callbacks = [
keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=patience,
restore_best_weights=True,
verbose=1
),
keras.callbacks.ReduceLROnPlateau(
monitor='val_loss',
factor=0.5,
patience=5,
min_lr=1e-7,
verbose=1
),
keras.callbacks.ModelCheckpoint(
f'best_{model_name}.h5',
monitor='val_loss',
save_best_only=True,
verbose=1
)
]
return callbacks
def train_timeseries_model(model, X_train, y_train, X_test, y_test,
epochs=100, batch_size=32, model_name='timeseries'):
"""
训练时间序列模型
"""
# 编译模型
model = compile_timeseries_model(model)
# 创建回调
callbacks = create_timeseries_callbacks(model_name)
# 训练模型
history = model.fit(
X_train, y_train,
epochs=epochs,
batch_size=batch_size,
validation_data=(X_test, y_test),
callbacks=callbacks,
verbose=1
)
return history
# 训练LSTM模型
print("训练LSTM模型...")
lstm_history = train_timeseries_model(
lstm_model, X_train, y_train, X_test, y_test,
epochs=50, model_name='lstm_timeseries'
)训练可视化
python
def plot_training_history(history, title='模型训练历史'):
"""
绘制训练历史
"""
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 损失
axes[0, 0].plot(history.history['loss'], label='训练损失')
axes[0, 0].plot(history.history['val_loss'], label='验证损失')
axes[0, 0].set_title('模型损失')
axes[0, 0].set_xlabel('Epoch')
axes[0, 0].set_ylabel('MSE')
axes[0, 0].legend()
# MAE
axes[0, 1].plot(history.history['mae'], label='训练MAE')
axes[0, 1].plot(history.history['val_mae'], label='验证MAE')
axes[0, 1].set_title('平均绝对误差')
axes[0, 1].set_xlabel('Epoch')
axes[0, 1].set_ylabel('MAE')
axes[0, 1].legend()
# MAPE
axes[1, 0].plot(history.history['mape'], label='训练MAPE')
axes[1, 0].plot(history.history['val_mape'], label='验证MAPE')
axes[1, 0].set_title('平均绝对百分比误差')
axes[1, 0].set_xlabel('Epoch')
axes[1, 0].set_ylabel('MAPE')
axes[1, 0].legend()
# 学习率(如果有记录)
if 'lr' in history.history:
axes[1, 1].plot(history.history['lr'])
axes[1, 1].set_title('学习率')
axes[1, 1].set_xlabel('Epoch')
axes[1, 1].set_ylabel('学习率')
axes[1, 1].set_yscale('log')
plt.suptitle(title)
plt.tight_layout()
plt.show()
# 可视化训练历史
plot_training_history(lstm_history, 'LSTM模型训练历史')模型评估
预测和评估指标
python
def evaluate_timeseries_model(model, X_test, y_test, scaler=None):
"""
评估时间序列模型
"""
# 预测
y_pred = model.predict(X_test)
# 反缩放(如果使用了缩放)
if scaler is not None:
y_test_orig = scaler.inverse_transform(y_test.reshape(-1, 1)).flatten()
y_pred_orig = scaler.inverse_transform(y_pred.reshape(-1, 1)).flatten()
else:
y_test_orig = y_test.flatten()
y_pred_orig = y_pred.flatten()
# 计算评估指标
mse = mean_squared_error(y_test_orig, y_pred_orig)
mae = mean_absolute_error(y_test_orig, y_pred_orig)
rmse = np.sqrt(mse)
# MAPE (平均绝对百分比误差)
mape = np.mean(np.abs((y_test_orig - y_pred_orig) / y_test_orig)) * 100
# R²分数
r2 = r2_score(y_test_orig, y_pred_orig)
print(f"评估指标:")
print(f"MSE: {mse:.4f}")
print(f"MAE: {mae:.4f}")
print(f"RMSE: {rmse:.4f}")
print(f"MAPE: {mape:.2f}%")
print(f"R²: {r2:.4f}")
return {
'mse': mse,
'mae': mae,
'rmse': rmse,
'mape': mape,
'r2': r2,
'y_true': y_test_orig,
'y_pred': y_pred_orig
}
def plot_predictions(y_true, y_pred, title='预测结果', n_samples=200):
"""
绘制预测结果
"""
# 只显示部分样本以便观察
if len(y_true) > n_samples:
indices = np.random.choice(len(y_true), n_samples, replace=False)
indices = np.sort(indices)
y_true_plot = y_true[indices]
y_pred_plot = y_pred[indices]
else:
y_true_plot = y_true
y_pred_plot = y_pred
indices = np.arange(len(y_true))
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 时间序列对比
axes[0, 0].plot(indices, y_true_plot, label='真实值', alpha=0.7)
axes[0, 0].plot(indices, y_pred_plot, label='预测值', alpha=0.7)
axes[0, 0].set_title('预测vs真实值')
axes[0, 0].set_xlabel('时间')
axes[0, 0].set_ylabel('值')
axes[0, 0].legend()
# 散点图
axes[0, 1].scatter(y_true, y_pred, alpha=0.5)
min_val = min(y_true.min(), y_pred.min())
max_val = max(y_true.max(), y_pred.max())
axes[0, 1].plot([min_val, max_val], [min_val, max_val], 'r--', lw=2)
axes[0, 1].set_xlabel('真实值')
axes[0, 1].set_ylabel('预测值')
axes[0, 1].set_title('预测vs真实值散点图')
# 残差图
residuals = y_true - y_pred
axes[1, 0].scatter(y_pred, residuals, alpha=0.5)
axes[1, 0].axhline(y=0, color='r', linestyle='--')
axes[1, 0].set_xlabel('预测值')
axes[1, 0].set_ylabel('残差')
axes[1, 0].set_title('残差图')
# 残差分布
axes[1, 1].hist(residuals, bins=30, alpha=0.7)
axes[1, 1].set_xlabel('残差')
axes[1, 1].set_ylabel('频次')
axes[1, 1].set_title('残差分布')
plt.suptitle(title)
plt.tight_layout()
plt.show()
# 评估LSTM模型
lstm_results = evaluate_timeseries_model(lstm_model, X_test, y_test, scaler)
plot_predictions(lstm_results['y_true'], lstm_results['y_pred'], 'LSTM模型预测结果')多步预测
python
def multi_step_prediction(model, initial_sequence, n_steps, scaler=None):
"""
多步预测
"""
predictions = []
current_sequence = initial_sequence.copy()
for _ in range(n_steps):
# 预测下一个值
next_pred = model.predict(current_sequence.reshape(1, -1, 1), verbose=0)
predictions.append(next_pred[0, 0])
# 更新序列(滑动窗口)
current_sequence = np.append(current_sequence[1:], next_pred[0, 0])
predictions = np.array(predictions)
# 反缩放
if scaler is not None:
predictions = scaler.inverse_transform(predictions.reshape(-1, 1)).flatten()
return predictions
def plot_multi_step_prediction(model, X_test, y_test, scaler, n_steps=30, sample_idx=0):
"""
可视化多步预测
"""
# 选择一个测试样本
initial_sequence = X_test[sample_idx].flatten()
# 进行多步预测
predictions = multi_step_prediction(model, initial_sequence, n_steps, scaler)
# 获取真实的后续值
if sample_idx + n_steps < len(y_test):
true_values = y_test[sample_idx:sample_idx + n_steps]
if scaler is not None:
true_values = scaler.inverse_transform(true_values.reshape(-1, 1)).flatten()
else:
true_values = None
# 绘图
plt.figure(figsize=(12, 6))
# 历史数据
if scaler is not None:
history = scaler.inverse_transform(initial_sequence.reshape(-1, 1)).flatten()
else:
history = initial_sequence
history_x = np.arange(-len(history), 0)
pred_x = np.arange(0, n_steps)
plt.plot(history_x, history, label='历史数据', color='blue')
plt.plot(pred_x, predictions, label='预测值', color='red', marker='o')
if true_values is not None:
plt.plot(pred_x[:len(true_values)], true_values,
label='真实值', color='green', marker='s')
plt.axvline(x=0, color='black', linestyle='--', alpha=0.5)
plt.xlabel('时间步')
plt.ylabel('值')
plt.title(f'{n_steps}步预测')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
# 多步预测示例
plot_multi_step_prediction(lstm_model, X_test, y_test, scaler, n_steps=30)模型比较
多模型对比
python
def compare_models(models, model_names, X_train, y_train, X_test, y_test, scaler=None):
"""
比较多个模型的性能
"""
results = {}
for model, name in zip(models, model_names):
print(f"\n训练 {name} 模型...")
# 编译和训练模型
model = compile_timeseries_model(model)
# 简化训练(减少epoch以节省时间)
history = model.fit(
X_train, y_train,
epochs=20,
batch_size=32,
validation_data=(X_test, y_test),
verbose=0
)
# 评估模型
model_results = evaluate_timeseries_model(model, X_test, y_test, scaler)
results[name] = model_results
return results
def plot_model_comparison(results):
"""
可视化模型比较结果
"""
model_names = list(results.keys())
metrics = ['mse', 'mae', 'rmse', 'mape', 'r2']
fig, axes = plt.subplots(2, 3, figsize=(18, 10))
axes = axes.flatten()
for i, metric in enumerate(metrics):
values = [results[name][metric] for name in model_names]
bars = axes[i].bar(model_names, values)
axes[i].set_title(f'{metric.upper()}')
axes[i].set_ylabel(metric.upper())
# 添加数值标签
for bar, value in zip(bars, values):
height = bar.get_height()
axes[i].text(bar.get_x() + bar.get_width()/2., height,
f'{value:.3f}', ha='center', va='bottom')
# 旋转x轴标签
axes[i].tick_params(axis='x', rotation=45)
# 隐藏最后一个子图
axes[-1].axis('off')
plt.tight_layout()
plt.show()
# 创建多个模型进行比较
models_to_compare = [
create_lstm_model(sequence_length, n_features=1),
create_gru_model(sequence_length, n_features=1),
create_cnn_lstm_model(sequence_length, n_features=1),
]
model_names = ['LSTM', 'GRU', 'CNN-LSTM']
# 比较模型(注意:这会重新训练模型)
# comparison_results = compare_models(models_to_compare, model_names, X_train, y_train, X_test, y_test, scaler)
# plot_model_comparison(comparison_results)实时预测系统
在线预测
python
class TimeSeriesPredictor:
"""
时间序列预测器类
"""
def __init__(self, model, scaler, sequence_length):
self.model = model
self.scaler = scaler
self.sequence_length = sequence_length
self.history = []
def add_data_point(self, value):
"""
添加新的数据点
"""
self.history.append(value)
# 保持历史数据长度
if len(self.history) > self.sequence_length * 2:
self.history = self.history[-self.sequence_length * 2:]
def predict_next(self):
"""
预测下一个值
"""
if len(self.history) < self.sequence_length:
raise ValueError(f"需要至少 {self.sequence_length} 个历史数据点")
# 准备输入序列
sequence = np.array(self.history[-self.sequence_length:])
# 缩放
if self.scaler is not None:
sequence_scaled = self.scaler.transform(sequence.reshape(-1, 1)).flatten()
else:
sequence_scaled = sequence
# 预测
prediction_scaled = self.model.predict(
sequence_scaled.reshape(1, self.sequence_length, 1),
verbose=0
)[0, 0]
# 反缩放
if self.scaler is not None:
prediction = self.scaler.inverse_transform([[prediction_scaled]])[0, 0]
else:
prediction = prediction_scaled
return prediction
def predict_multiple(self, n_steps):
"""
预测多个步骤
"""
predictions = []
for _ in range(n_steps):
pred = self.predict_next()
predictions.append(pred)
self.add_data_point(pred) # 将预测值添加到历史中
return predictions
def simulate_real_time_prediction(predictor, test_data, n_predictions=50):
"""
模拟实时预测
"""
predictions = []
true_values = []
# 初始化历史数据
for i in range(predictor.sequence_length):
predictor.add_data_point(test_data[i])
# 进行实时预测
for i in range(predictor.sequence_length,
min(len(test_data), predictor.sequence_length + n_predictions)):
# 预测
pred = predictor.predict_next()
predictions.append(pred)
# 真实值
true_val = test_data[i]
true_values.append(true_val)
# 添加真实值到历史(模拟实时数据到达)
predictor.add_data_point(true_val)
return np.array(predictions), np.array(true_values)
# 创建预测器
predictor = TimeSeriesPredictor(lstm_model, scaler, sequence_length)
# 模拟实时预测
if scaler is not None:
test_data_orig = scaler.inverse_transform(
ts_data['value'].values[-200:].reshape(-1, 1)
).flatten()
else:
test_data_orig = ts_data['value'].values[-200:]
rt_predictions, rt_true_values = simulate_real_time_prediction(
predictor, test_data_orig, n_predictions=50
)
# 可视化实时预测结果
plt.figure(figsize=(12, 6))
plt.plot(rt_true_values, label='真实值', marker='o')
plt.plot(rt_predictions, label='预测值', marker='s')
plt.xlabel('时间步')
plt.ylabel('值')
plt.title('实时预测结果')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
# 计算实时预测误差
rt_mae = mean_absolute_error(rt_true_values, rt_predictions)
rt_mape = np.mean(np.abs((rt_true_values - rt_predictions) / rt_true_values)) * 100
print(f"实时预测 MAE: {rt_mae:.4f}")
print(f"实时预测 MAPE: {rt_mape:.2f}%")模型部署
保存和加载模型
python
def save_timeseries_model(model, scaler, model_path):
"""
保存时间序列模型和预处理器
"""
import pickle
# 保存模型
model.save(f'{model_path}.h5')
# 保存缩放器
if scaler is not None:
with open(f'{model_path}_scaler.pkl', 'wb') as f:
pickle.dump(scaler, f)
print(f"模型已保存到: {model_path}")
def load_timeseries_model(model_path):
"""
加载时间序列模型和预处理器
"""
import pickle
# 加载模型
model = keras.models.load_model(f'{model_path}.h5')
# 加载缩放器
try:
with open(f'{model_path}_scaler.pkl', 'rb') as f:
scaler = pickle.load(f)
except FileNotFoundError:
scaler = None
return model, scaler
# 保存模型
save_timeseries_model(lstm_model, scaler, 'lstm_timeseries_model')Web API部署
python
def create_timeseries_api(model, scaler, sequence_length):
"""
创建时间序列预测Web API
"""
from flask import Flask, request, jsonify
app = Flask(__name__)
predictor = TimeSeriesPredictor(model, scaler, sequence_length)
@app.route('/predict', methods=['POST'])
def predict():
try:
data = request.get_json()
if 'sequence' not in data:
return jsonify({'error': '缺少sequence字段'}), 400
sequence = data['sequence']
if len(sequence) < sequence_length:
return jsonify({
'error': f'序列长度至少需要 {sequence_length}'
}), 400
# 设置历史数据
predictor.history = sequence[-sequence_length:]
# 预测
prediction = predictor.predict_next()
return jsonify({
'prediction': float(prediction),
'sequence_length': len(sequence)
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/predict_multiple', methods=['POST'])
def predict_multiple():
try:
data = request.get_json()
if 'sequence' not in data or 'n_steps' not in data:
return jsonify({'error': '缺少必要字段'}), 400
sequence = data['sequence']
n_steps = data['n_steps']
if len(sequence) < sequence_length:
return jsonify({
'error': f'序列长度至少需要 {sequence_length}'
}), 400
# 设置历史数据
predictor.history = sequence[-sequence_length:]
# 多步预测
predictions = predictor.predict_multiple(n_steps)
return jsonify({
'predictions': [float(p) for p in predictions],
'n_steps': n_steps
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/health', methods=['GET'])
def health():
return jsonify({'status': 'healthy'})
return app
# 创建API
# api_app = create_timeseries_api(lstm_model, scaler, sequence_length)
# api_app.run(host='0.0.0.0', port=5000, debug=True)总结
本章通过完整的时间序列预测项目,展示了深度学习在时序数据分析中的应用:
关键要点:
- 数据理解:时间序列的趋势、季节性、周期性分析
- 特征工程:滞后特征、移动平均、时间特征
- 模型选择:LSTM、GRU、CNN-LSTM、Transformer等
- 评估指标:MSE、MAE、MAPE、R²等时序专用指标
- 多步预测:递归预测和直接预测策略
- 实时系统:在线预测和流式处理
- 模型部署:API服务和生产环境考虑
最佳实践:
- 充分分析时间序列特征
- 选择合适的序列长度和预测窗口
- 使用适当的数据缩放和归一化
- 考虑模型的计算复杂度和预测延迟
- 实施模型监控和自动重训练
- 处理缺失值和异常值
- 考虑外部因素和多变量预测
时间序列预测在金融、零售、制造业等领域有广泛应用,掌握这些技术对于数据科学家来说非常重要。