Skip to content

PyTorch 循环神经网络

循环神经网络简介

循环神经网络(Recurrent Neural Network, RNN)是专门处理序列数据的神经网络架构。与传统的前馈神经网络不同,RNN具有记忆能力,能够处理变长序列并捕捉时间依赖关系。

python
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, Dataset

# RNN的基本组件
rnn = nn.RNN(input_size=100, hidden_size=128, num_layers=2, batch_first=True)
lstm = nn.LSTM(input_size=100, hidden_size=128, num_layers=2, batch_first=True)
gru = nn.GRU(input_size=100, hidden_size=128, num_layers=2, batch_first=True)

基础RNN

1. 简单RNN实现

python
class SimpleRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1):
        super(SimpleRNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # RNN层
        self.rnn = nn.RNN(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            nonlinearity='tanh'  # 'tanh' or 'relu'
        )
        
        # 输出层
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x, hidden=None):
        # x shape: (batch_size, seq_len, input_size)
        batch_size = x.size(0)
        
        # 初始化隐藏状态
        if hidden is None:
            hidden = torch.zeros(self.num_layers, batch_size, self.hidden_size)
            if x.is_cuda:
                hidden = hidden.cuda()
        
        # RNN前向传播
        rnn_out, hidden = self.rnn(x, hidden)
        
        # 只使用最后一个时间步的输出
        output = self.fc(rnn_out[:, -1, :])
        
        return output, hidden

# 测试简单RNN
input_size, hidden_size, output_size = 10, 20, 5
seq_len, batch_size = 15, 32

model = SimpleRNN(input_size, hidden_size, output_size)
x = torch.randn(batch_size, seq_len, input_size)
output, hidden = model(x)

print(f"输入形状: {x.shape}")
print(f"输出形状: {output.shape}")
print(f"隐藏状态形状: {hidden.shape}")

2. 双向RNN

python
class BiRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1):
        super(BiRNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # 双向RNN
        self.rnn = nn.RNN(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=True
        )
        
        # 输出层(注意双向RNN的输出维度是2*hidden_size)
        self.fc = nn.Linear(hidden_size * 2, output_size)
    
    def forward(self, x):
        # 双向RNN前向传播
        rnn_out, _ = self.rnn(x)
        
        # 使用最后一个时间步的输出
        output = self.fc(rnn_out[:, -1, :])
        
        return output

# 测试双向RNN
bi_model = BiRNN(input_size, hidden_size, output_size)
bi_output = bi_model(x)
print(f"双向RNN输出形状: {bi_output.shape}")

LSTM网络

1. LSTM基础实现

python
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=2, dropout=0.2):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # LSTM层
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0,
            bidirectional=False
        )
        
        # Dropout层
        self.dropout = nn.Dropout(dropout)
        
        # 输出层
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x, hidden=None):
        batch_size = x.size(0)
        
        # 初始化隐藏状态和细胞状态
        if hidden is None:
            h0 = torch.zeros(self.num_layers, batch_size, self.hidden_size)
            c0 = torch.zeros(self.num_layers, batch_size, self.hidden_size)
            if x.is_cuda:
                h0, c0 = h0.cuda(), c0.cuda()
            hidden = (h0, c0)
        
        # LSTM前向传播
        lstm_out, hidden = self.lstm(x, hidden)
        
        # 应用dropout
        lstm_out = self.dropout(lstm_out)
        
        # 输出层
        output = self.fc(lstm_out[:, -1, :])  # 使用最后一个时间步
        
        return output, hidden

# 测试LSTM
lstm_model = LSTMModel(input_size, hidden_size, output_size)
lstm_output, lstm_hidden = lstm_model(x)
print(f"LSTM输出形状: {lstm_output.shape}")
print(f"LSTM隐藏状态形状: {lstm_hidden[0].shape}, {lstm_hidden[1].shape}")

2. 多对多LSTM(序列到序列)

python
class Seq2SeqLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=2):
        super(Seq2SeqLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True
        )
        
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        # x shape: (batch_size, seq_len, input_size)
        lstm_out, _ = self.lstm(x)
        
        # 对每个时间步都进行预测
        output = self.fc(lstm_out)  # (batch_size, seq_len, output_size)
        
        return output

# 测试序列到序列LSTM
seq2seq_model = Seq2SeqLSTM(input_size, hidden_size, output_size)
seq2seq_output = seq2seq_model(x)
print(f"Seq2Seq输出形状: {seq2seq_output.shape}")

GRU网络

1. GRU实现

python
class GRUModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=2, dropout=0.2):
        super(GRUModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # GRU层
        self.gru = nn.GRU(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0
        )
        
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x, hidden=None):
        batch_size = x.size(0)
        
        if hidden is None:
            hidden = torch.zeros(self.num_layers, batch_size, self.hidden_size)
            if x.is_cuda:
                hidden = hidden.cuda()
        
        gru_out, hidden = self.gru(x, hidden)
        gru_out = self.dropout(gru_out)
        output = self.fc(gru_out[:, -1, :])
        
        return output, hidden

# 测试GRU
gru_model = GRUModel(input_size, hidden_size, output_size)
gru_output, gru_hidden = gru_model(x)
print(f"GRU输出形状: {gru_output.shape}")

注意力机制

1. 基础注意力

python
class AttentionRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=2):
        super(AttentionRNN, self).__init__()
        self.hidden_size = hidden_size
        
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True
        )
        
        # 注意力机制
        self.attention = nn.Linear(hidden_size, 1)
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        # LSTM输出
        lstm_out, _ = self.lstm(x)  # (batch_size, seq_len, hidden_size)
        
        # 计算注意力权重
        attention_weights = torch.softmax(self.attention(lstm_out), dim=1)  # (batch_size, seq_len, 1)
        
        # 加权求和
        context_vector = torch.sum(attention_weights * lstm_out, dim=1)  # (batch_size, hidden_size)
        
        # 输出
        output = self.fc(context_vector)
        
        return output, attention_weights

# 测试注意力RNN
attention_model = AttentionRNN(input_size, hidden_size, output_size)
attention_output, attention_weights = attention_model(x)
print(f"注意力RNN输出形状: {attention_output.shape}")
print(f"注意力权重形状: {attention_weights.shape}")

2. 自注意力机制

python
class SelfAttentionRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_heads=8):
        super(SelfAttentionRNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_heads = num_heads
        self.head_dim = hidden_size // num_heads
        
        # LSTM层
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        
        # 多头自注意力
        self.multihead_attn = nn.MultiheadAttention(
            embed_dim=hidden_size,
            num_heads=num_heads,
            batch_first=True
        )
        
        # 层归一化
        self.layer_norm = nn.LayerNorm(hidden_size)
        
        # 输出层
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        # LSTM处理
        lstm_out, _ = self.lstm(x)
        
        # 自注意力
        attn_out, attn_weights = self.multihead_attn(lstm_out, lstm_out, lstm_out)
        
        # 残差连接和层归一化
        out = self.layer_norm(lstm_out + attn_out)
        
        # 全局平均池化
        out = torch.mean(out, dim=1)
        
        # 输出
        output = self.fc(out)
        
        return output, attn_weights

# 测试自注意力RNN
self_attn_model = SelfAttentionRNN(input_size, hidden_size, output_size)
self_attn_output, self_attn_weights = self_attn_model(x)
print(f"自注意力RNN输出形状: {self_attn_output.shape}")

实际应用示例

1. 文本分类

python
class TextClassificationRNN(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_size, num_classes, num_layers=2):
        super(TextClassificationRNN, self).__init__()
        
        # 词嵌入层
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        
        # LSTM层
        self.lstm = nn.LSTM(
            input_size=embed_dim,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=0.3,
            bidirectional=True
        )
        
        # 分类层
        self.classifier = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(hidden_size * 2, hidden_size),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(hidden_size, num_classes)
        )
    
    def forward(self, x):
        # 词嵌入
        embedded = self.embedding(x)  # (batch_size, seq_len, embed_dim)
        
        # LSTM处理
        lstm_out, _ = self.lstm(embedded)
        
        # 使用最后一个时间步的输出
        output = self.classifier(lstm_out[:, -1, :])
        
        return output

# 创建文本分类模型
vocab_size, embed_dim, num_classes = 10000, 128, 5
text_model = TextClassificationRNN(vocab_size, embed_dim, hidden_size, num_classes)

# 测试
text_input = torch.randint(0, vocab_size, (32, 50))  # 32个样本,每个50个词
text_output = text_model(text_input)
print(f"文本分类输出形状: {text_output.shape}")

2. 时间序列预测

python
class TimeSeriesPredictor(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers=3, dropout=0.2):
        super(TimeSeriesPredictor, self).__init__()
        
        # 多层LSTM
        self.lstm1 = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.dropout1 = nn.Dropout(dropout)
        
        self.lstm2 = nn.LSTM(hidden_size, hidden_size, batch_first=True)
        self.dropout2 = nn.Dropout(dropout)
        
        self.lstm3 = nn.LSTM(hidden_size, hidden_size, batch_first=True)
        self.dropout3 = nn.Dropout(dropout)
        
        # 输出层
        self.fc = nn.Linear(hidden_size, 1)
    
    def forward(self, x):
        # 第一层LSTM
        out, _ = self.lstm1(x)
        out = self.dropout1(out)
        
        # 第二层LSTM
        out, _ = self.lstm2(out)
        out = self.dropout2(out)
        
        # 第三层LSTM
        out, _ = self.lstm3(out)
        out = self.dropout3(out)
        
        # 预测下一个时间步
        prediction = self.fc(out[:, -1, :])
        
        return prediction

# 创建时间序列预测模型
ts_model = TimeSeriesPredictor(input_size=1, hidden_size=64)

# 生成示例时间序列数据
def generate_sine_wave(seq_len, num_samples):
    x = np.linspace(0, 4*np.pi, seq_len)
    data = []
    for _ in range(num_samples):
        phase = np.random.uniform(0, 2*np.pi)
        amplitude = np.random.uniform(0.5, 2.0)
        noise = np.random.normal(0, 0.1, seq_len)
        y = amplitude * np.sin(x + phase) + noise
        data.append(y)
    return np.array(data)

# 测试时间序列预测
ts_data = generate_sine_wave(50, 32)
ts_input = torch.FloatTensor(ts_data).unsqueeze(-1)  # (32, 50, 1)
ts_output = ts_model(ts_input)
print(f"时间序列预测输出形状: {ts_output.shape}")

3. 序列到序列翻译

python
class Seq2SeqTranslator(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, embed_dim, hidden_size):
        super(Seq2SeqTranslator, self).__init__()
        
        # 编码器
        self.src_embedding = nn.Embedding(src_vocab_size, embed_dim)
        self.encoder = nn.LSTM(embed_dim, hidden_size, batch_first=True)
        
        # 解码器
        self.tgt_embedding = nn.Embedding(tgt_vocab_size, embed_dim)
        self.decoder = nn.LSTM(embed_dim, hidden_size, batch_first=True)
        
        # 输出层
        self.output_projection = nn.Linear(hidden_size, tgt_vocab_size)
    
    def forward(self, src, tgt):
        # 编码
        src_embedded = self.src_embedding(src)
        encoder_out, (hidden, cell) = self.encoder(src_embedded)
        
        # 解码
        tgt_embedded = self.tgt_embedding(tgt)
        decoder_out, _ = self.decoder(tgt_embedded, (hidden, cell))
        
        # 输出投影
        output = self.output_projection(decoder_out)
        
        return output

# 创建翻译模型
src_vocab_size, tgt_vocab_size = 5000, 4000
translator = Seq2SeqTranslator(src_vocab_size, tgt_vocab_size, embed_dim, hidden_size)

# 测试
src_seq = torch.randint(0, src_vocab_size, (32, 20))  # 源序列
tgt_seq = torch.randint(0, tgt_vocab_size, (32, 25))  # 目标序列
translation_output = translator(src_seq, tgt_seq)
print(f"翻译输出形状: {translation_output.shape}")

训练技巧

1. 梯度裁剪

python
def train_rnn_with_gradient_clipping(model, dataloader, criterion, optimizer, max_norm=1.0):
    model.train()
    total_loss = 0
    
    for batch_idx, (data, target) in enumerate(dataloader):
        optimizer.zero_grad()
        
        output = model(data)
        loss = criterion(output, target)
        
        loss.backward()
        
        # 梯度裁剪
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm)
        
        optimizer.step()
        
        total_loss += loss.item()
    
    return total_loss / len(dataloader)

2. 学习率预热

python
class WarmupScheduler:
    def __init__(self, optimizer, warmup_steps, d_model):
        self.optimizer = optimizer
        self.warmup_steps = warmup_steps
        self.d_model = d_model
        self.step_num = 0
    
    def step(self):
        self.step_num += 1
        lr = self.d_model ** (-0.5) * min(
            self.step_num ** (-0.5),
            self.step_num * self.warmup_steps ** (-1.5)
        )
        
        for param_group in self.optimizer.param_groups:
            param_group['lr'] = lr

3. 序列打包(PackedSequence)

python
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence

class PackedRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(PackedRNN, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x, lengths):
        # 打包序列
        packed_x = pack_padded_sequence(x, lengths, batch_first=True, enforce_sorted=False)
        
        # LSTM处理
        packed_out, (hidden, cell) = self.lstm(packed_x)
        
        # 解包序列
        lstm_out, _ = pad_packed_sequence(packed_out, batch_first=True)
        
        # 获取每个序列的最后一个有效输出
        batch_size = x.size(0)
        last_outputs = []
        for i, length in enumerate(lengths):
            last_outputs.append(lstm_out[i, length-1, :])
        
        last_outputs = torch.stack(last_outputs)
        output = self.fc(last_outputs)
        
        return output

# 使用打包序列
def collate_fn(batch):
    # 假设batch是[(seq1, label1), (seq2, label2), ...]的形式
    sequences, labels = zip(*batch)
    lengths = [len(seq) for seq in sequences]
    
    # 填充序列到相同长度
    max_len = max(lengths)
    padded_sequences = []
    for seq in sequences:
        padded = torch.zeros(max_len, seq.size(-1))
        padded[:len(seq)] = seq
        padded_sequences.append(padded)
    
    return torch.stack(padded_sequences), torch.tensor(lengths), torch.tensor(labels)

可视化和分析

1. 注意力权重可视化

python
def visualize_attention(attention_weights, input_tokens, figsize=(10, 8)):
    """可视化注意力权重"""
    attention_weights = attention_weights.squeeze().detach().cpu().numpy()
    
    plt.figure(figsize=figsize)
    plt.imshow(attention_weights.T, cmap='Blues', aspect='auto')
    plt.colorbar()
    plt.xlabel('时间步')
    plt.ylabel('输入位置')
    plt.title('注意力权重热力图')
    
    if input_tokens:
        plt.yticks(range(len(input_tokens)), input_tokens)
    
    plt.tight_layout()
    plt.show()

# 使用示例
# visualize_attention(attention_weights, ['word1', 'word2', 'word3', ...])

2. 隐藏状态可视化

python
def visualize_hidden_states(model, input_sequence, layer_idx=0):
    """可视化RNN隐藏状态的演化"""
    model.eval()
    
    hidden_states = []
    
    # 获取每个时间步的隐藏状态
    with torch.no_grad():
        hidden = None
        for t in range(input_sequence.size(1)):
            input_t = input_sequence[:, t:t+1, :]
            output, hidden = model.lstm(input_t, hidden)
            if isinstance(hidden, tuple):  # LSTM
                hidden_states.append(hidden[0][layer_idx, 0, :].cpu().numpy())
            else:  # RNN/GRU
                hidden_states.append(hidden[layer_idx, 0, :].cpu().numpy())
    
    hidden_states = np.array(hidden_states)
    
    # 可视化
    plt.figure(figsize=(12, 8))
    plt.imshow(hidden_states.T, cmap='viridis', aspect='auto')
    plt.colorbar()
    plt.xlabel('时间步')
    plt.ylabel('隐藏单元')
    plt.title(f'第{layer_idx}层隐藏状态演化')
    plt.tight_layout()
    plt.show()
    
    return hidden_states

性能优化

1. 批量处理优化

python
class OptimizedRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=2):
        super(OptimizedRNN, self).__init__()
        
        # 使用更高效的LSTM实现
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=0.1 if num_layers > 1 else 0
        )
        
        self.fc = nn.Linear(hidden_size, output_size)
        
        # 启用cuDNN优化
        self.lstm.flatten_parameters()
    
    def forward(self, x):
        # 确保参数是连续的(cuDNN优化)
        self.lstm.flatten_parameters()
        
        lstm_out, _ = self.lstm(x)
        output = self.fc(lstm_out[:, -1, :])
        
        return output

2. 内存优化

python
def train_with_checkpointing(model, dataloader, criterion, optimizer):
    """使用梯度检查点节省内存"""
    from torch.utils.checkpoint import checkpoint
    
    model.train()
    total_loss = 0
    
    for data, target in dataloader:
        optimizer.zero_grad()
        
        # 使用梯度检查点
        def run_function(x):
            return model(x)
        
        output = checkpoint(run_function, data)
        loss = criterion(output, target)
        
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    return total_loss / len(dataloader)

总结

循环神经网络是处理序列数据的重要工具,本章介绍了:

  1. 基础架构:RNN、LSTM、GRU的原理和实现
  2. 高级技术:注意力机制、双向RNN、序列到序列模型
  3. 实际应用:文本分类、时间序列预测、机器翻译
  4. 训练技巧:梯度裁剪、序列打包、学习率调度
  5. 可视化分析:注意力权重、隐藏状态的可视化方法
  6. 性能优化:批量处理、内存优化技术

掌握RNN将为你在自然语言处理、时间序列分析等领域的应用打下坚实基础!

本站内容仅供学习和研究使用。