Skip to content

PyTorch 时间序列预测

时间序列预测概述

时间序列预测是根据历史数据预测未来值的重要任务。PyTorch提供了强大的工具来构建各种时间序列预测模型,从简单的LSTM到复杂的Transformer架构。

python
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error
from torch.utils.data import Dataset, DataLoader

数据预处理

1. 时间序列数据集类

python
class TimeSeriesDataset(Dataset):
    def __init__(self, data, sequence_length, prediction_length=1):
        """
        时间序列数据集
        Args:
            data: 时间序列数据 (numpy array)
            sequence_length: 输入序列长度
            prediction_length: 预测长度
        """
        self.data = data
        self.sequence_length = sequence_length
        self.prediction_length = prediction_length
        
    def __len__(self):
        return len(self.data) - self.sequence_length - self.prediction_length + 1
    
    def __getitem__(self, idx):
        # 输入序列
        x = self.data[idx:idx + self.sequence_length]
        # 目标序列
        y = self.data[idx + self.sequence_length:idx + self.sequence_length + self.prediction_length]
        
        return torch.FloatTensor(x), torch.FloatTensor(y)

def create_time_series_data(data, sequence_length=60, prediction_length=1, 
                           train_ratio=0.8, val_ratio=0.1):
    """创建时间序列数据集"""
    
    # 数据标准化
    scaler = MinMaxScaler()
    scaled_data = scaler.fit_transform(data.reshape(-1, 1)).flatten()
    
    # 计算分割点
    total_len = len(scaled_data)
    train_len = int(total_len * train_ratio)
    val_len = int(total_len * val_ratio)
    
    # 分割数据
    train_data = scaled_data[:train_len]
    val_data = scaled_data[train_len:train_len + val_len]
    test_data = scaled_data[train_len + val_len:]
    
    # 创建数据集
    train_dataset = TimeSeriesDataset(train_data, sequence_length, prediction_length)
    val_dataset = TimeSeriesDataset(val_data, sequence_length, prediction_length)
    test_dataset = TimeSeriesDataset(test_data, sequence_length, prediction_length)
    
    return train_dataset, val_dataset, test_dataset, scaler

# 生成示例数据
def generate_sine_wave_data(length=1000, frequency=0.1, noise_level=0.1):
    """生成正弦波数据"""
    t = np.linspace(0, length * frequency, length)
    data = np.sin(2 * np.pi * t) + noise_level * np.random.randn(length)
    return data

def generate_stock_like_data(length=1000, trend=0.001, volatility=0.02):
    """生成类似股价的数据"""
    returns = np.random.normal(trend, volatility, length)
    prices = np.exp(np.cumsum(returns)) * 100
    return prices

# 示例数据
sine_data = generate_sine_wave_data(1000)
stock_data = generate_stock_like_data(1000)

# 创建数据集
train_dataset, val_dataset, test_dataset, scaler = create_time_series_data(
    sine_data, sequence_length=60, prediction_length=1
)

print(f"训练集大小: {len(train_dataset)}")
print(f"验证集大小: {len(val_dataset)}")
print(f"测试集大小: {len(test_dataset)}")

2. 多变量时间序列

python
class MultiVariateTimeSeriesDataset(Dataset):
    def __init__(self, data, sequence_length, prediction_length=1, target_column=0):
        """
        多变量时间序列数据集
        Args:
            data: 多变量时间序列数据 (numpy array, shape: [time_steps, features])
            sequence_length: 输入序列长度
            prediction_length: 预测长度
            target_column: 目标变量的列索引
        """
        self.data = data
        self.sequence_length = sequence_length
        self.prediction_length = prediction_length
        self.target_column = target_column
        
    def __len__(self):
        return len(self.data) - self.sequence_length - self.prediction_length + 1
    
    def __getitem__(self, idx):
        # 输入序列(所有特征)
        x = self.data[idx:idx + self.sequence_length]
        # 目标序列(只有目标变量)
        y = self.data[idx + self.sequence_length:idx + self.sequence_length + self.prediction_length, 
                     self.target_column]
        
        return torch.FloatTensor(x), torch.FloatTensor(y)

def create_multivariate_data(n_samples=1000, n_features=5):
    """创建多变量时间序列数据"""
    t = np.linspace(0, 10, n_samples)
    
    # 创建相关的多变量数据
    data = np.zeros((n_samples, n_features))
    
    # 主要趋势
    trend = 0.1 * t + np.sin(0.5 * t)
    
    for i in range(n_features):
        # 每个特征都与主要趋势相关,但有不同的相位和噪声
        phase = i * np.pi / n_features
        noise = 0.1 * np.random.randn(n_samples)
        data[:, i] = trend + 0.5 * np.sin(t + phase) + noise
    
    return data

# 创建多变量数据
multivar_data = create_multivariate_data(1000, 5)
multivar_dataset = MultiVariateTimeSeriesDataset(
    multivar_data, sequence_length=60, prediction_length=1, target_column=0
)

LSTM时间序列模型

1. 基础LSTM模型

python
class LSTMPredictor(nn.Module):
    def __init__(self, input_size=1, hidden_size=50, num_layers=2, 
                 output_size=1, dropout=0.2):
        super(LSTMPredictor, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # LSTM层
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0
        )
        
        # 输出层
        self.fc = nn.Linear(hidden_size, output_size)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        # LSTM前向传播
        lstm_out, (hidden, cell) = self.lstm(x)
        
        # 使用最后一个时间步的输出
        last_output = lstm_out[:, -1, :]
        
        # 应用dropout和全连接层
        output = self.dropout(last_output)
        output = self.fc(output)
        
        return output

# 创建模型
model = LSTMPredictor(input_size=1, hidden_size=50, num_layers=2, output_size=1)

# 测试模型
sample_input = torch.randn(32, 60, 1)  # (batch_size, sequence_length, input_size)
sample_output = model(sample_input)
print(f"输入形状: {sample_input.shape}")
print(f"输出形状: {sample_output.shape}")

2. 双向LSTM模型

python
class BiLSTMPredictor(nn.Module):
    def __init__(self, input_size=1, hidden_size=50, num_layers=2, 
                 output_size=1, dropout=0.2):
        super(BiLSTMPredictor, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # 双向LSTM
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0,
            bidirectional=True
        )
        
        # 注意力机制
        self.attention = nn.Linear(hidden_size * 2, 1)
        
        # 输出层
        self.fc = nn.Linear(hidden_size * 2, output_size)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        # 双向LSTM
        lstm_out, _ = self.lstm(x)  # (batch_size, seq_len, hidden_size * 2)
        
        # 注意力机制
        attention_weights = torch.softmax(self.attention(lstm_out), dim=1)
        context_vector = torch.sum(attention_weights * lstm_out, dim=1)
        
        # 输出
        output = self.dropout(context_vector)
        output = self.fc(output)
        
        return output

3. 多步预测LSTM

python
class MultiStepLSTM(nn.Module):
    def __init__(self, input_size=1, hidden_size=50, num_layers=2, 
                 output_size=10, dropout=0.2):
        super(MultiStepLSTM, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.output_size = output_size
        
        # 编码器LSTM
        self.encoder_lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0
        )
        
        # 解码器LSTM
        self.decoder_lstm = nn.LSTM(
            input_size=1,  # 解码器输入维度
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0
        )
        
        # 输出层
        self.fc = nn.Linear(hidden_size, 1)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, target_length=None):
        batch_size = x.size(0)
        
        # 编码器
        encoder_out, (hidden, cell) = self.encoder_lstm(x)
        
        # 解码器
        if target_length is None:
            target_length = self.output_size
        
        decoder_input = torch.zeros(batch_size, 1, 1, device=x.device)
        decoder_hidden = (hidden, cell)
        
        outputs = []
        
        for _ in range(target_length):
            decoder_out, decoder_hidden = self.decoder_lstm(decoder_input, decoder_hidden)
            output = self.fc(self.dropout(decoder_out))
            outputs.append(output)
            
            # 使用当前输出作为下一步的输入
            decoder_input = output
        
        # 拼接所有输出
        outputs = torch.cat(outputs, dim=1)  # (batch_size, target_length, 1)
        
        return outputs.squeeze(-1)  # (batch_size, target_length)

GRU时间序列模型

1. GRU预测器

python
class GRUPredictor(nn.Module):
    def __init__(self, input_size=1, hidden_size=50, num_layers=2, 
                 output_size=1, dropout=0.2):
        super(GRUPredictor, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # GRU层
        self.gru = nn.GRU(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0
        )
        
        # 输出层
        self.fc = nn.Linear(hidden_size, output_size)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        # GRU前向传播
        gru_out, hidden = self.gru(x)
        
        # 使用最后一个时间步的输出
        last_output = gru_out[:, -1, :]
        
        # 输出
        output = self.dropout(last_output)
        output = self.fc(output)
        
        return output

Transformer时间序列模型

1. 时间序列Transformer

python
class TimeSeriesTransformer(nn.Module):
    def __init__(self, input_size=1, d_model=64, nhead=8, num_layers=6, 
                 output_size=1, max_seq_length=1000, dropout=0.1):
        super(TimeSeriesTransformer, self).__init__()
        
        self.d_model = d_model
        self.input_projection = nn.Linear(input_size, d_model)
        
        # 位置编码
        self.pos_encoding = PositionalEncoding(d_model, max_seq_length)
        
        # Transformer编码器
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=d_model * 4,
            dropout=dropout,
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)
        
        # 输出层
        self.output_projection = nn.Linear(d_model, output_size)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        # 输入投影
        x = self.input_projection(x) * math.sqrt(self.d_model)
        
        # 位置编码
        x = self.pos_encoding(x)
        
        # Transformer编码
        transformer_out = self.transformer(x)
        
        # 使用最后一个时间步的输出
        last_output = transformer_out[:, -1, :]
        
        # 输出投影
        output = self.dropout(last_output)
        output = self.output_projection(output)
        
        return output

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * 
                           (-math.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        return x + self.pe[:x.size(1), :].transpose(0, 1)

训练框架

1. 时间序列训练器

python
class TimeSeriesTrainer:
    def __init__(self, model, train_loader, val_loader, device, learning_rate=0.001):
        self.model = model.to(device)
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.device = device
        
        # 损失函数和优化器
        self.criterion = nn.MSELoss()
        self.optimizer = optim.Adam(model.parameters(), lr=learning_rate)
        self.scheduler = optim.lr_scheduler.ReduceLROnPlateau(
            self.optimizer, mode='min', patience=10, factor=0.5
        )
        
        # 训练历史
        self.train_losses = []
        self.val_losses = []
        self.best_val_loss = float('inf')
        
    def train_epoch(self):
        """训练一个epoch"""
        self.model.train()
        total_loss = 0
        
        for batch_idx, (data, target) in enumerate(self.train_loader):
            data, target = data.to(self.device), target.to(self.device)
            
            self.optimizer.zero_grad()
            
            # 前向传播
            output = self.model(data)
            
            # 确保输出和目标的形状匹配
            if output.dim() != target.dim():
                if target.dim() == 1:
                    target = target.unsqueeze(-1)
                elif output.dim() == 1:
                    output = output.unsqueeze(-1)
            
            loss = self.criterion(output, target)
            
            # 反向传播
            loss.backward()
            
            # 梯度裁剪
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
            
            self.optimizer.step()
            
            total_loss += loss.item()
        
        return total_loss / len(self.train_loader)
    
    def validate_epoch(self):
        """验证一个epoch"""
        self.model.eval()
        total_loss = 0
        
        with torch.no_grad():
            for data, target in self.val_loader:
                data, target = data.to(self.device), target.to(self.device)
                
                output = self.model(data)
                
                # 确保形状匹配
                if output.dim() != target.dim():
                    if target.dim() == 1:
                        target = target.unsqueeze(-1)
                    elif output.dim() == 1:
                        output = output.unsqueeze(-1)
                
                loss = self.criterion(output, target)
                total_loss += loss.item()
        
        return total_loss / len(self.val_loader)
    
    def train(self, num_epochs):
        """完整训练流程"""
        print(f"开始训练,共{num_epochs}个epoch")
        
        for epoch in range(num_epochs):
            # 训练
            train_loss = self.train_epoch()
            
            # 验证
            val_loss = self.validate_epoch()
            
            # 更新学习率
            self.scheduler.step(val_loss)
            
            # 记录历史
            self.train_losses.append(train_loss)
            self.val_losses.append(val_loss)
            
            # 保存最佳模型
            if val_loss < self.best_val_loss:
                self.best_val_loss = val_loss
                torch.save(self.model.state_dict(), 'best_model.pth')
            
            # 打印进度
            if (epoch + 1) % 10 == 0:
                print(f'Epoch {epoch+1}/{num_epochs}:')
                print(f'  Train Loss: {train_loss:.6f}')
                print(f'  Val Loss: {val_loss:.6f}')
                print(f'  LR: {self.optimizer.param_groups[0]["lr"]:.8f}')
        
        print(f'训练完成! 最佳验证损失: {self.best_val_loss:.6f}')
        
        return self.train_losses, self.val_losses

# 使用示例
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 创建数据加载器
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

# 创建模型
model = LSTMPredictor(input_size=1, hidden_size=50, num_layers=2, output_size=1)

# 创建训练器
trainer = TimeSeriesTrainer(model, train_loader, val_loader, device)

# 训练模型
train_losses, val_losses = trainer.train(num_epochs=100)

模型评估

1. 预测和评估

python
def evaluate_model(model, test_loader, scaler, device):
    """评估模型性能"""
    model.eval()
    predictions = []
    actuals = []
    
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            
            output = model(data)
            
            # 转换回CPU并添加到列表
            predictions.extend(output.cpu().numpy())
            actuals.extend(target.cpu().numpy())
    
    # 转换为numpy数组
    predictions = np.array(predictions)
    actuals = np.array(actuals)
    
    # 反标准化
    predictions = scaler.inverse_transform(predictions.reshape(-1, 1)).flatten()
    actuals = scaler.inverse_transform(actuals.reshape(-1, 1)).flatten()
    
    # 计算指标
    mse = mean_squared_error(actuals, predictions)
    mae = mean_absolute_error(actuals, predictions)
    rmse = np.sqrt(mse)
    
    # 计算MAPE
    mape = np.mean(np.abs((actuals - predictions) / actuals)) * 100
    
    print(f"评估结果:")
    print(f"  MSE: {mse:.6f}")
    print(f"  MAE: {mae:.6f}")
    print(f"  RMSE: {rmse:.6f}")
    print(f"  MAPE: {mape:.2f}%")
    
    return predictions, actuals, {
        'mse': mse, 'mae': mae, 'rmse': rmse, 'mape': mape
    }

# 评估模型
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
model.load_state_dict(torch.load('best_model.pth'))

predictions, actuals, metrics = evaluate_model(model, test_loader, scaler, device)

2. 可视化结果

python
def plot_predictions(actuals, predictions, title="时间序列预测结果"):
    """可视化预测结果"""
    plt.figure(figsize=(15, 6))
    
    # 只显示前200个点以便观察
    n_points = min(200, len(actuals))
    
    plt.plot(range(n_points), actuals[:n_points], label='实际值', color='blue', alpha=0.7)
    plt.plot(range(n_points), predictions[:n_points], label='预测值', color='red', alpha=0.7)
    
    plt.title(title)
    plt.xlabel('时间步')
    plt.ylabel('值')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

def plot_training_history(train_losses, val_losses):
    """可视化训练历史"""
    plt.figure(figsize=(12, 4))
    
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='训练损失')
    plt.plot(val_losses, label='验证损失')
    plt.title('训练历史')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    plt.subplot(1, 2, 2)
    plt.plot(train_losses, label='训练损失')
    plt.plot(val_losses, label='验证损失')
    plt.title('训练历史 (对数尺度)')
    plt.xlabel('Epoch')
    plt.ylabel('Loss (log scale)')
    plt.yscale('log')
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

# 可视化结果
plot_predictions(actuals, predictions)
plot_training_history(train_losses, val_losses)

高级技术

1. 注意力机制

python
class AttentionLSTM(nn.Module):
    def __init__(self, input_size=1, hidden_size=50, num_layers=2, 
                 output_size=1, dropout=0.2):
        super(AttentionLSTM, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # LSTM层
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0
        )
        
        # 注意力机制
        self.attention = nn.Sequential(
            nn.Linear(hidden_size, hidden_size),
            nn.Tanh(),
            nn.Linear(hidden_size, 1)
        )
        
        # 输出层
        self.fc = nn.Linear(hidden_size, output_size)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        # LSTM输出
        lstm_out, _ = self.lstm(x)  # (batch_size, seq_len, hidden_size)
        
        # 计算注意力权重
        attention_scores = self.attention(lstm_out)  # (batch_size, seq_len, 1)
        attention_weights = torch.softmax(attention_scores, dim=1)
        
        # 加权求和
        context_vector = torch.sum(attention_weights * lstm_out, dim=1)  # (batch_size, hidden_size)
        
        # 输出
        output = self.dropout(context_vector)
        output = self.fc(output)
        
        return output, attention_weights

2. 残差连接

python
class ResidualLSTM(nn.Module):
    def __init__(self, input_size=1, hidden_size=50, num_layers=4, 
                 output_size=1, dropout=0.2):
        super(ResidualLSTM, self).__init__()
        
        self.input_projection = nn.Linear(input_size, hidden_size)
        
        # 多个LSTM层与残差连接
        self.lstm_layers = nn.ModuleList()
        for i in range(num_layers):
            self.lstm_layers.append(
                nn.LSTM(hidden_size, hidden_size, 1, batch_first=True, dropout=0)
            )
        
        self.layer_norms = nn.ModuleList([
            nn.LayerNorm(hidden_size) for _ in range(num_layers)
        ])
        
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_size, output_size)
        
    def forward(self, x):
        # 输入投影
        x = self.input_projection(x)
        
        # 通过多个LSTM层
        for i, (lstm, norm) in enumerate(zip(self.lstm_layers, self.layer_norms)):
            residual = x
            lstm_out, _ = lstm(x)
            
            # 残差连接和层归一化
            x = norm(lstm_out + residual)
            x = self.dropout(x)
        
        # 输出
        last_output = x[:, -1, :]
        output = self.fc(last_output)
        
        return output

3. 多尺度特征提取

python
class MultiScaleLSTM(nn.Module):
    def __init__(self, input_size=1, hidden_size=50, output_size=1, 
                 scales=[1, 3, 5], dropout=0.2):
        super(MultiScaleLSTM, self).__init__()
        
        self.scales = scales
        
        # 不同尺度的LSTM
        self.lstm_layers = nn.ModuleList()
        for scale in scales:
            self.lstm_layers.append(
                nn.LSTM(input_size, hidden_size, 2, batch_first=True, dropout=dropout)
            )
        
        # 融合层
        self.fusion = nn.Linear(len(scales) * hidden_size, hidden_size)
        self.fc = nn.Linear(hidden_size, output_size)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        scale_outputs = []
        
        for i, (scale, lstm) in enumerate(zip(self.scales, self.lstm_layers)):
            # 多尺度采样
            if scale > 1:
                # 简单的下采样
                sampled_x = x[:, ::scale, :]
            else:
                sampled_x = x
            
            # LSTM处理
            lstm_out, _ = lstm(sampled_x)
            last_output = lstm_out[:, -1, :]
            scale_outputs.append(last_output)
        
        # 融合不同尺度的特征
        fused = torch.cat(scale_outputs, dim=1)
        fused = self.fusion(fused)
        fused = torch.relu(fused)
        fused = self.dropout(fused)
        
        # 输出
        output = self.fc(fused)
        
        return output

实际应用示例

1. 股价预测

python
def create_stock_prediction_pipeline():
    """创建股价预测管道"""
    
    # 生成模拟股价数据
    stock_data = generate_stock_like_data(2000, trend=0.0005, volatility=0.02)
    
    # 创建数据集
    train_dataset, val_dataset, test_dataset, scaler = create_time_series_data(
        stock_data, sequence_length=60, prediction_length=1
    )
    
    # 创建数据加载器
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
    
    # 创建模型
    model = AttentionLSTM(input_size=1, hidden_size=64, num_layers=3, output_size=1)
    
    # 训练
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    trainer = TimeSeriesTrainer(model, train_loader, val_loader, device, learning_rate=0.001)
    
    train_losses, val_losses = trainer.train(num_epochs=100)
    
    # 评估
    model.load_state_dict(torch.load('best_model.pth'))
    predictions, actuals, metrics = evaluate_model(model, test_loader, scaler, device)
    
    return model, predictions, actuals, metrics

# 运行股价预测示例
# model, predictions, actuals, metrics = create_stock_prediction_pipeline()

2. 多步预测

python
def multi_step_prediction_example():
    """多步预测示例"""
    
    # 创建多步预测数据集
    class MultiStepDataset(Dataset):
        def __init__(self, data, input_length, output_length):
            self.data = data
            self.input_length = input_length
            self.output_length = output_length
            
        def __len__(self):
            return len(self.data) - self.input_length - self.output_length + 1
        
        def __getitem__(self, idx):
            x = self.data[idx:idx + self.input_length]
            y = self.data[idx + self.input_length:idx + self.input_length + self.output_length]
            return torch.FloatTensor(x), torch.FloatTensor(y)
    
    # 生成数据
    data = generate_sine_wave_data(1000)
    scaler = MinMaxScaler()
    scaled_data = scaler.fit_transform(data.reshape(-1, 1)).flatten()
    
    # 创建数据集
    dataset = MultiStepDataset(scaled_data, input_length=60, output_length=10)
    train_size = int(0.8 * len(dataset))
    val_size = len(dataset) - train_size
    
    train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
    
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
    
    # 创建多步预测模型
    model = MultiStepLSTM(input_size=1, hidden_size=64, num_layers=2, output_size=10)
    
    # 训练
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    trainer = TimeSeriesTrainer(model, train_loader, val_loader, device)
    
    train_losses, val_losses = trainer.train(num_epochs=50)
    
    return model, scaler

# 运行多步预测示例
# multi_step_model, multi_step_scaler = multi_step_prediction_example()

总结

时间序列预测是PyTorch的重要应用领域,本章介绍了:

  1. 数据预处理:时间序列数据集的创建和预处理技术
  2. 经典模型:LSTM、GRU等循环神经网络模型
  3. 现代架构:Transformer等注意力机制模型
  4. 训练框架:完整的训练、验证、评估流程
  5. 高级技术:注意力机制、残差连接、多尺度特征提取
  6. 实际应用:股价预测、多步预测等具体案例

掌握这些技术将帮助你在金融预测、需求预测、异常检测等时间序列相关任务中取得成功!

本站内容仅供学习和研究使用。