PyTorch 循环神经网络
循环神经网络简介
循环神经网络(Recurrent Neural Network, RNN)是专门处理序列数据的神经网络架构。与传统的前馈神经网络不同,RNN具有记忆能力,能够处理变长序列并捕捉时间依赖关系。
python
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, Dataset
# RNN的基本组件
rnn = nn.RNN(input_size=100, hidden_size=128, num_layers=2, batch_first=True)
lstm = nn.LSTM(input_size=100, hidden_size=128, num_layers=2, batch_first=True)
gru = nn.GRU(input_size=100, hidden_size=128, num_layers=2, batch_first=True)基础RNN
1. 简单RNN实现
python
class SimpleRNN(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers=1):
super(SimpleRNN, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
# RNN层
self.rnn = nn.RNN(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
nonlinearity='tanh' # 'tanh' or 'relu'
)
# 输出层
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x, hidden=None):
# x shape: (batch_size, seq_len, input_size)
batch_size = x.size(0)
# 初始化隐藏状态
if hidden is None:
hidden = torch.zeros(self.num_layers, batch_size, self.hidden_size)
if x.is_cuda:
hidden = hidden.cuda()
# RNN前向传播
rnn_out, hidden = self.rnn(x, hidden)
# 只使用最后一个时间步的输出
output = self.fc(rnn_out[:, -1, :])
return output, hidden
# 测试简单RNN
input_size, hidden_size, output_size = 10, 20, 5
seq_len, batch_size = 15, 32
model = SimpleRNN(input_size, hidden_size, output_size)
x = torch.randn(batch_size, seq_len, input_size)
output, hidden = model(x)
print(f"输入形状: {x.shape}")
print(f"输出形状: {output.shape}")
print(f"隐藏状态形状: {hidden.shape}")2. 双向RNN
python
class BiRNN(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers=1):
super(BiRNN, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
# 双向RNN
self.rnn = nn.RNN(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
bidirectional=True
)
# 输出层(注意双向RNN的输出维度是2*hidden_size)
self.fc = nn.Linear(hidden_size * 2, output_size)
def forward(self, x):
# 双向RNN前向传播
rnn_out, _ = self.rnn(x)
# 使用最后一个时间步的输出
output = self.fc(rnn_out[:, -1, :])
return output
# 测试双向RNN
bi_model = BiRNN(input_size, hidden_size, output_size)
bi_output = bi_model(x)
print(f"双向RNN输出形状: {bi_output.shape}")LSTM网络
1. LSTM基础实现
python
class LSTMModel(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers=2, dropout=0.2):
super(LSTMModel, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
# LSTM层
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0,
bidirectional=False
)
# Dropout层
self.dropout = nn.Dropout(dropout)
# 输出层
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x, hidden=None):
batch_size = x.size(0)
# 初始化隐藏状态和细胞状态
if hidden is None:
h0 = torch.zeros(self.num_layers, batch_size, self.hidden_size)
c0 = torch.zeros(self.num_layers, batch_size, self.hidden_size)
if x.is_cuda:
h0, c0 = h0.cuda(), c0.cuda()
hidden = (h0, c0)
# LSTM前向传播
lstm_out, hidden = self.lstm(x, hidden)
# 应用dropout
lstm_out = self.dropout(lstm_out)
# 输出层
output = self.fc(lstm_out[:, -1, :]) # 使用最后一个时间步
return output, hidden
# 测试LSTM
lstm_model = LSTMModel(input_size, hidden_size, output_size)
lstm_output, lstm_hidden = lstm_model(x)
print(f"LSTM输出形状: {lstm_output.shape}")
print(f"LSTM隐藏状态形状: {lstm_hidden[0].shape}, {lstm_hidden[1].shape}")2. 多对多LSTM(序列到序列)
python
class Seq2SeqLSTM(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers=2):
super(Seq2SeqLSTM, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True
)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
# x shape: (batch_size, seq_len, input_size)
lstm_out, _ = self.lstm(x)
# 对每个时间步都进行预测
output = self.fc(lstm_out) # (batch_size, seq_len, output_size)
return output
# 测试序列到序列LSTM
seq2seq_model = Seq2SeqLSTM(input_size, hidden_size, output_size)
seq2seq_output = seq2seq_model(x)
print(f"Seq2Seq输出形状: {seq2seq_output.shape}")GRU网络
1. GRU实现
python
class GRUModel(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers=2, dropout=0.2):
super(GRUModel, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
# GRU层
self.gru = nn.GRU(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0
)
self.dropout = nn.Dropout(dropout)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x, hidden=None):
batch_size = x.size(0)
if hidden is None:
hidden = torch.zeros(self.num_layers, batch_size, self.hidden_size)
if x.is_cuda:
hidden = hidden.cuda()
gru_out, hidden = self.gru(x, hidden)
gru_out = self.dropout(gru_out)
output = self.fc(gru_out[:, -1, :])
return output, hidden
# 测试GRU
gru_model = GRUModel(input_size, hidden_size, output_size)
gru_output, gru_hidden = gru_model(x)
print(f"GRU输出形状: {gru_output.shape}")注意力机制
1. 基础注意力
python
class AttentionRNN(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers=2):
super(AttentionRNN, self).__init__()
self.hidden_size = hidden_size
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True
)
# 注意力机制
self.attention = nn.Linear(hidden_size, 1)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
# LSTM输出
lstm_out, _ = self.lstm(x) # (batch_size, seq_len, hidden_size)
# 计算注意力权重
attention_weights = torch.softmax(self.attention(lstm_out), dim=1) # (batch_size, seq_len, 1)
# 加权求和
context_vector = torch.sum(attention_weights * lstm_out, dim=1) # (batch_size, hidden_size)
# 输出
output = self.fc(context_vector)
return output, attention_weights
# 测试注意力RNN
attention_model = AttentionRNN(input_size, hidden_size, output_size)
attention_output, attention_weights = attention_model(x)
print(f"注意力RNN输出形状: {attention_output.shape}")
print(f"注意力权重形状: {attention_weights.shape}")2. 自注意力机制
python
class SelfAttentionRNN(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_heads=8):
super(SelfAttentionRNN, self).__init__()
self.hidden_size = hidden_size
self.num_heads = num_heads
self.head_dim = hidden_size // num_heads
# LSTM层
self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
# 多头自注意力
self.multihead_attn = nn.MultiheadAttention(
embed_dim=hidden_size,
num_heads=num_heads,
batch_first=True
)
# 层归一化
self.layer_norm = nn.LayerNorm(hidden_size)
# 输出层
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
# LSTM处理
lstm_out, _ = self.lstm(x)
# 自注意力
attn_out, attn_weights = self.multihead_attn(lstm_out, lstm_out, lstm_out)
# 残差连接和层归一化
out = self.layer_norm(lstm_out + attn_out)
# 全局平均池化
out = torch.mean(out, dim=1)
# 输出
output = self.fc(out)
return output, attn_weights
# 测试自注意力RNN
self_attn_model = SelfAttentionRNN(input_size, hidden_size, output_size)
self_attn_output, self_attn_weights = self_attn_model(x)
print(f"自注意力RNN输出形状: {self_attn_output.shape}")实际应用示例
1. 文本分类
python
class TextClassificationRNN(nn.Module):
def __init__(self, vocab_size, embed_dim, hidden_size, num_classes, num_layers=2):
super(TextClassificationRNN, self).__init__()
# 词嵌入层
self.embedding = nn.Embedding(vocab_size, embed_dim)
# LSTM层
self.lstm = nn.LSTM(
input_size=embed_dim,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=0.3,
bidirectional=True
)
# 分类层
self.classifier = nn.Sequential(
nn.Dropout(0.5),
nn.Linear(hidden_size * 2, hidden_size),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(hidden_size, num_classes)
)
def forward(self, x):
# 词嵌入
embedded = self.embedding(x) # (batch_size, seq_len, embed_dim)
# LSTM处理
lstm_out, _ = self.lstm(embedded)
# 使用最后一个时间步的输出
output = self.classifier(lstm_out[:, -1, :])
return output
# 创建文本分类模型
vocab_size, embed_dim, num_classes = 10000, 128, 5
text_model = TextClassificationRNN(vocab_size, embed_dim, hidden_size, num_classes)
# 测试
text_input = torch.randint(0, vocab_size, (32, 50)) # 32个样本,每个50个词
text_output = text_model(text_input)
print(f"文本分类输出形状: {text_output.shape}")2. 时间序列预测
python
class TimeSeriesPredictor(nn.Module):
def __init__(self, input_size, hidden_size, num_layers=3, dropout=0.2):
super(TimeSeriesPredictor, self).__init__()
# 多层LSTM
self.lstm1 = nn.LSTM(input_size, hidden_size, batch_first=True)
self.dropout1 = nn.Dropout(dropout)
self.lstm2 = nn.LSTM(hidden_size, hidden_size, batch_first=True)
self.dropout2 = nn.Dropout(dropout)
self.lstm3 = nn.LSTM(hidden_size, hidden_size, batch_first=True)
self.dropout3 = nn.Dropout(dropout)
# 输出层
self.fc = nn.Linear(hidden_size, 1)
def forward(self, x):
# 第一层LSTM
out, _ = self.lstm1(x)
out = self.dropout1(out)
# 第二层LSTM
out, _ = self.lstm2(out)
out = self.dropout2(out)
# 第三层LSTM
out, _ = self.lstm3(out)
out = self.dropout3(out)
# 预测下一个时间步
prediction = self.fc(out[:, -1, :])
return prediction
# 创建时间序列预测模型
ts_model = TimeSeriesPredictor(input_size=1, hidden_size=64)
# 生成示例时间序列数据
def generate_sine_wave(seq_len, num_samples):
x = np.linspace(0, 4*np.pi, seq_len)
data = []
for _ in range(num_samples):
phase = np.random.uniform(0, 2*np.pi)
amplitude = np.random.uniform(0.5, 2.0)
noise = np.random.normal(0, 0.1, seq_len)
y = amplitude * np.sin(x + phase) + noise
data.append(y)
return np.array(data)
# 测试时间序列预测
ts_data = generate_sine_wave(50, 32)
ts_input = torch.FloatTensor(ts_data).unsqueeze(-1) # (32, 50, 1)
ts_output = ts_model(ts_input)
print(f"时间序列预测输出形状: {ts_output.shape}")3. 序列到序列翻译
python
class Seq2SeqTranslator(nn.Module):
def __init__(self, src_vocab_size, tgt_vocab_size, embed_dim, hidden_size):
super(Seq2SeqTranslator, self).__init__()
# 编码器
self.src_embedding = nn.Embedding(src_vocab_size, embed_dim)
self.encoder = nn.LSTM(embed_dim, hidden_size, batch_first=True)
# 解码器
self.tgt_embedding = nn.Embedding(tgt_vocab_size, embed_dim)
self.decoder = nn.LSTM(embed_dim, hidden_size, batch_first=True)
# 输出层
self.output_projection = nn.Linear(hidden_size, tgt_vocab_size)
def forward(self, src, tgt):
# 编码
src_embedded = self.src_embedding(src)
encoder_out, (hidden, cell) = self.encoder(src_embedded)
# 解码
tgt_embedded = self.tgt_embedding(tgt)
decoder_out, _ = self.decoder(tgt_embedded, (hidden, cell))
# 输出投影
output = self.output_projection(decoder_out)
return output
# 创建翻译模型
src_vocab_size, tgt_vocab_size = 5000, 4000
translator = Seq2SeqTranslator(src_vocab_size, tgt_vocab_size, embed_dim, hidden_size)
# 测试
src_seq = torch.randint(0, src_vocab_size, (32, 20)) # 源序列
tgt_seq = torch.randint(0, tgt_vocab_size, (32, 25)) # 目标序列
translation_output = translator(src_seq, tgt_seq)
print(f"翻译输出形状: {translation_output.shape}")训练技巧
1. 梯度裁剪
python
def train_rnn_with_gradient_clipping(model, dataloader, criterion, optimizer, max_norm=1.0):
model.train()
total_loss = 0
for batch_idx, (data, target) in enumerate(dataloader):
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
# 梯度裁剪
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm)
optimizer.step()
total_loss += loss.item()
return total_loss / len(dataloader)2. 学习率预热
python
class WarmupScheduler:
def __init__(self, optimizer, warmup_steps, d_model):
self.optimizer = optimizer
self.warmup_steps = warmup_steps
self.d_model = d_model
self.step_num = 0
def step(self):
self.step_num += 1
lr = self.d_model ** (-0.5) * min(
self.step_num ** (-0.5),
self.step_num * self.warmup_steps ** (-1.5)
)
for param_group in self.optimizer.param_groups:
param_group['lr'] = lr3. 序列打包(PackedSequence)
python
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
class PackedRNN(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(PackedRNN, self).__init__()
self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x, lengths):
# 打包序列
packed_x = pack_padded_sequence(x, lengths, batch_first=True, enforce_sorted=False)
# LSTM处理
packed_out, (hidden, cell) = self.lstm(packed_x)
# 解包序列
lstm_out, _ = pad_packed_sequence(packed_out, batch_first=True)
# 获取每个序列的最后一个有效输出
batch_size = x.size(0)
last_outputs = []
for i, length in enumerate(lengths):
last_outputs.append(lstm_out[i, length-1, :])
last_outputs = torch.stack(last_outputs)
output = self.fc(last_outputs)
return output
# 使用打包序列
def collate_fn(batch):
# 假设batch是[(seq1, label1), (seq2, label2), ...]的形式
sequences, labels = zip(*batch)
lengths = [len(seq) for seq in sequences]
# 填充序列到相同长度
max_len = max(lengths)
padded_sequences = []
for seq in sequences:
padded = torch.zeros(max_len, seq.size(-1))
padded[:len(seq)] = seq
padded_sequences.append(padded)
return torch.stack(padded_sequences), torch.tensor(lengths), torch.tensor(labels)可视化和分析
1. 注意力权重可视化
python
def visualize_attention(attention_weights, input_tokens, figsize=(10, 8)):
"""可视化注意力权重"""
attention_weights = attention_weights.squeeze().detach().cpu().numpy()
plt.figure(figsize=figsize)
plt.imshow(attention_weights.T, cmap='Blues', aspect='auto')
plt.colorbar()
plt.xlabel('时间步')
plt.ylabel('输入位置')
plt.title('注意力权重热力图')
if input_tokens:
plt.yticks(range(len(input_tokens)), input_tokens)
plt.tight_layout()
plt.show()
# 使用示例
# visualize_attention(attention_weights, ['word1', 'word2', 'word3', ...])2. 隐藏状态可视化
python
def visualize_hidden_states(model, input_sequence, layer_idx=0):
"""可视化RNN隐藏状态的演化"""
model.eval()
hidden_states = []
# 获取每个时间步的隐藏状态
with torch.no_grad():
hidden = None
for t in range(input_sequence.size(1)):
input_t = input_sequence[:, t:t+1, :]
output, hidden = model.lstm(input_t, hidden)
if isinstance(hidden, tuple): # LSTM
hidden_states.append(hidden[0][layer_idx, 0, :].cpu().numpy())
else: # RNN/GRU
hidden_states.append(hidden[layer_idx, 0, :].cpu().numpy())
hidden_states = np.array(hidden_states)
# 可视化
plt.figure(figsize=(12, 8))
plt.imshow(hidden_states.T, cmap='viridis', aspect='auto')
plt.colorbar()
plt.xlabel('时间步')
plt.ylabel('隐藏单元')
plt.title(f'第{layer_idx}层隐藏状态演化')
plt.tight_layout()
plt.show()
return hidden_states性能优化
1. 批量处理优化
python
class OptimizedRNN(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers=2):
super(OptimizedRNN, self).__init__()
# 使用更高效的LSTM实现
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=0.1 if num_layers > 1 else 0
)
self.fc = nn.Linear(hidden_size, output_size)
# 启用cuDNN优化
self.lstm.flatten_parameters()
def forward(self, x):
# 确保参数是连续的(cuDNN优化)
self.lstm.flatten_parameters()
lstm_out, _ = self.lstm(x)
output = self.fc(lstm_out[:, -1, :])
return output2. 内存优化
python
def train_with_checkpointing(model, dataloader, criterion, optimizer):
"""使用梯度检查点节省内存"""
from torch.utils.checkpoint import checkpoint
model.train()
total_loss = 0
for data, target in dataloader:
optimizer.zero_grad()
# 使用梯度检查点
def run_function(x):
return model(x)
output = checkpoint(run_function, data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
total_loss += loss.item()
return total_loss / len(dataloader)总结
循环神经网络是处理序列数据的重要工具,本章介绍了:
- 基础架构:RNN、LSTM、GRU的原理和实现
- 高级技术:注意力机制、双向RNN、序列到序列模型
- 实际应用:文本分类、时间序列预测、机器翻译
- 训练技巧:梯度裁剪、序列打包、学习率调度
- 可视化分析:注意力权重、隐藏状态的可视化方法
- 性能优化:批量处理、内存优化技术
掌握RNN将为你在自然语言处理、时间序列分析等领域的应用打下坚实基础!