Transformer模型
Transformer是一种基于注意力机制的神经网络架构,由Vaswani等人在2017年的论文"Attention Is All You Need"中提出。它彻底改变了自然语言处理领域,成为现代NLP的基石。
Transformer基础概念
什么是Transformer?
Transformer完全基于注意力机制,摒弃了传统的循环和卷积结构。它能够并行处理序列中的所有位置,大大提高了训练效率,同时在长距离依赖建模方面表现出色。
核心组件
python
import tensorflow as tf
from tensorflow import keras
import numpy as np
import matplotlib.pyplot as plt
# Transformer的核心组件
class MultiHeadAttention(keras.layers.Layer):
def __init__(self, d_model, num_heads):
super(MultiHeadAttention, self).__init__()
self.num_heads = num_heads
self.d_model = d_model
assert d_model % self.num_heads == 0
self.depth = d_model // self.num_heads
self.wq = keras.layers.Dense(d_model)
self.wk = keras.layers.Dense(d_model)
self.wv = keras.layers.Dense(d_model)
self.dense = keras.layers.Dense(d_model)
def split_heads(self, x, batch_size):
x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
return tf.transpose(x, perm=[0, 2, 1, 3])
def call(self, v, k, q, mask):
batch_size = tf.shape(q)[0]
q = self.wq(q)
k = self.wk(k)
v = self.wv(v)
q = self.split_heads(q, batch_size)
k = self.split_heads(k, batch_size)
v = self.split_heads(v, batch_size)
scaled_attention, attention_weights = scaled_dot_product_attention(
q, k, v, mask)
scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
concat_attention = tf.reshape(scaled_attention,
(batch_size, -1, self.d_model))
output = self.dense(concat_attention)
return output, attention_weights
def scaled_dot_product_attention(q, k, v, mask):
"""计算注意力权重"""
matmul_qk = tf.matmul(q, k, transpose_b=True)
# 缩放
dk = tf.cast(tf.shape(k)[-1], tf.float32)
scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
# 添加mask
if mask is not None:
scaled_attention_logits += (mask * -1e9)
# softmax
attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
output = tf.matmul(attention_weights, v)
return output, attention_weights位置编码
python
def get_angles(pos, i, d_model):
angle_rates = 1 / np.power(10000, (2 * (i//2)) / np.float32(d_model))
return pos * angle_rates
def positional_encoding(position, d_model):
angle_rads = get_angles(np.arange(position)[:, np.newaxis],
np.arange(d_model)[np.newaxis, :],
d_model)
# 对偶数索引应用sin
angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
# 对奇数索引应用cos
angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
pos_encoding = angle_rads[np.newaxis, ...]
return tf.cast(pos_encoding, dtype=tf.float32)
# 可视化位置编码
def plot_positional_encoding(pos_encoding):
plt.figure(figsize=(15, 5))
plt.pcolormesh(pos_encoding[0], cmap='RdYlBu')
plt.xlabel('Depth')
plt.xlim((0, 512))
plt.ylabel('Position')
plt.colorbar()
plt.title('Positional Encoding')
plt.show()
# 示例
pos_encoding = positional_encoding(50, 512)
print(pos_encoding.shape)
# plot_positional_encoding(pos_encoding)前馈网络
python
def point_wise_feed_forward_network(d_model, dff):
return keras.Sequential([
keras.layers.Dense(dff, activation='relu'),
keras.layers.Dense(d_model)
])
# 示例
sample_ffn = point_wise_feed_forward_network(512, 2048)
sample_ffn(tf.random.uniform((64, 50, 512))).shapeEncoder层
python
class EncoderLayer(keras.layers.Layer):
def __init__(self, d_model, num_heads, dff, rate=0.1):
super(EncoderLayer, self).__init__()
self.mha = MultiHeadAttention(d_model, num_heads)
self.ffn = point_wise_feed_forward_network(d_model, dff)
self.layernorm1 = keras.layers.LayerNormalization(epsilon=1e-6)
self.layernorm2 = keras.layers.LayerNormalization(epsilon=1e-6)
self.dropout1 = keras.layers.Dropout(rate)
self.dropout2 = keras.layers.Dropout(rate)
def call(self, x, training, mask):
attn_output, _ = self.mha(x, x, x, mask)
attn_output = self.dropout1(attn_output, training=training)
out1 = self.layernorm1(x + attn_output)
ffn_output = self.ffn(out1)
ffn_output = self.dropout2(ffn_output, training=training)
out2 = self.layernorm2(out1 + ffn_output)
return out2
# 测试Encoder层
sample_encoder_layer = EncoderLayer(512, 8, 2048)
sample_encoder_layer_output = sample_encoder_layer(
tf.random.uniform((64, 43, 512)), False, None)
print(sample_encoder_layer_output.shape)Decoder层
python
class DecoderLayer(keras.layers.Layer):
def __init__(self, d_model, num_heads, dff, rate=0.1):
super(DecoderLayer, self).__init__()
self.mha1 = MultiHeadAttention(d_model, num_heads)
self.mha2 = MultiHeadAttention(d_model, num_heads)
self.ffn = point_wise_feed_forward_network(d_model, dff)
self.layernorm1 = keras.layers.LayerNormalization(epsilon=1e-6)
self.layernorm2 = keras.layers.LayerNormalization(epsilon=1e-6)
self.layernorm3 = keras.layers.LayerNormalization(epsilon=1e-6)
self.dropout1 = keras.layers.Dropout(rate)
self.dropout2 = keras.layers.Dropout(rate)
self.dropout3 = keras.layers.Dropout(rate)
def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
# 自注意力
attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask)
attn1 = self.dropout1(attn1, training=training)
out1 = self.layernorm1(attn1 + x)
# 编码器-解码器注意力
attn2, attn_weights_block2 = self.mha2(
enc_output, enc_output, out1, padding_mask)
attn2 = self.dropout2(attn2, training=training)
out2 = self.layernorm2(attn2 + out1)
# 前馈网络
ffn_output = self.ffn(out2)
ffn_output = self.dropout3(ffn_output, training=training)
out3 = self.layernorm3(ffn_output + out2)
return out3, attn_weights_block1, attn_weights_block2完整的Transformer模型
python
class Encoder(keras.layers.Layer):
def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size,
maximum_position_encoding, rate=0.1):
super(Encoder, self).__init__()
self.d_model = d_model
self.num_layers = num_layers
self.embedding = keras.layers.Embedding(input_vocab_size, d_model)
self.pos_encoding = positional_encoding(maximum_position_encoding,
self.d_model)
self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate)
for _ in range(num_layers)]
self.dropout = keras.layers.Dropout(rate)
def call(self, x, training, mask):
seq_len = tf.shape(x)[1]
# 词嵌入和位置编码
x = self.embedding(x)
x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
x += self.pos_encoding[:, :seq_len, :]
x = self.dropout(x, training=training)
for i in range(self.num_layers):
x = self.enc_layers[i](x, training, mask)
return x
class Decoder(keras.layers.Layer):
def __init__(self, num_layers, d_model, num_heads, dff, target_vocab_size,
maximum_position_encoding, rate=0.1):
super(Decoder, self).__init__()
self.d_model = d_model
self.num_layers = num_layers
self.embedding = keras.layers.Embedding(target_vocab_size, d_model)
self.pos_encoding = positional_encoding(maximum_position_encoding, d_model)
self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate)
for _ in range(num_layers)]
self.dropout = keras.layers.Dropout(rate)
def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
seq_len = tf.shape(x)[1]
attention_weights = {}
x = self.embedding(x)
x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
x += self.pos_encoding[:, :seq_len, :]
x = self.dropout(x, training=training)
for i in range(self.num_layers):
x, block1, block2 = self.dec_layers[i](x, enc_output, training,
look_ahead_mask, padding_mask)
attention_weights[f'decoder_layer{i+1}_block1'] = block1
attention_weights[f'decoder_layer{i+1}_block2'] = block2
return x, attention_weights
class Transformer(keras.Model):
def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size,
target_vocab_size, pe_input, pe_target, rate=0.1):
super(Transformer, self).__init__()
self.encoder = Encoder(num_layers, d_model, num_heads, dff,
input_vocab_size, pe_input, rate)
self.decoder = Decoder(num_layers, d_model, num_heads, dff,
target_vocab_size, pe_target, rate)
self.final_layer = keras.layers.Dense(target_vocab_size)
def call(self, inp, tar, training, enc_padding_mask,
look_ahead_mask, dec_padding_mask):
enc_output = self.encoder(inp, training, enc_padding_mask)
dec_output, attention_weights = self.decoder(
tar, enc_output, training, look_ahead_mask, dec_padding_mask)
final_output = self.final_layer(dec_output)
return final_output, attention_weights掩码机制
python
def create_padding_mask(seq):
seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
return seq[:, tf.newaxis, tf.newaxis, :]
def create_look_ahead_mask(size):
mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
return mask
def create_masks(inp, tar):
# 编码器填充掩码
enc_padding_mask = create_padding_mask(inp)
# 解码器第二个注意力块的填充掩码
dec_padding_mask = create_padding_mask(inp)
# 解码器第一个注意力块的前瞻掩码
look_ahead_mask = create_look_ahead_mask(tf.shape(tar)[1])
dec_target_padding_mask = create_padding_mask(tar)
combined_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)
return enc_padding_mask, combined_mask, dec_padding_mask训练设置
python
class CustomSchedule(keras.optimizers.schedules.LearningRateSchedule):
def __init__(self, d_model, warmup_steps=4000):
super(CustomSchedule, self).__init__()
self.d_model = d_model
self.d_model = tf.cast(self.d_model, tf.float32)
self.warmup_steps = warmup_steps
def __call__(self, step):
arg1 = tf.math.rsqrt(step)
arg2 = step * (self.warmup_steps ** -1.5)
return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)
# 学习率调度
learning_rate = CustomSchedule(512)
optimizer = keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98,
epsilon=1e-9)
# 损失函数
loss_object = keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction='none')
def loss_function(real, pred):
mask = tf.math.logical_not(tf.math.equal(real, 0))
loss_ = loss_object(real, pred)
mask = tf.cast(mask, dtype=loss_.dtype)
loss_ *= mask
return tf.reduce_sum(loss_)/tf.reduce_sum(mask)
# 指标
train_loss = keras.metrics.Mean(name='train_loss')
train_accuracy = keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')文本分类示例
python
def create_transformer_classifier(vocab_size, d_model=128, num_heads=8,
num_layers=4, dff=512, max_seq_len=512,
num_classes=2):
"""
创建用于文本分类的Transformer模型
"""
inputs = keras.layers.Input(shape=(max_seq_len,))
# 词嵌入
embedding = keras.layers.Embedding(vocab_size, d_model)(inputs)
# 位置编码
pos_encoding = positional_encoding(max_seq_len, d_model)
embedding += pos_encoding[:, :max_seq_len, :]
# Transformer编码器层
x = embedding
for _ in range(num_layers):
# 多头自注意力
attn_output = keras.layers.MultiHeadAttention(
num_heads=num_heads, key_dim=d_model//num_heads
)(x, x)
# 残差连接和层归一化
x = keras.layers.LayerNormalization()(x + attn_output)
# 前馈网络
ffn_output = keras.layers.Dense(dff, activation='relu')(x)
ffn_output = keras.layers.Dense(d_model)(ffn_output)
# 残差连接和层归一化
x = keras.layers.LayerNormalization()(x + ffn_output)
# 全局平均池化
pooled = keras.layers.GlobalAveragePooling1D()(x)
# 分类层
outputs = keras.layers.Dense(num_classes, activation='softmax')(pooled)
model = keras.Model(inputs=inputs, outputs=outputs)
return model
# 创建分类模型
classifier = create_transformer_classifier(vocab_size=10000)
classifier.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
classifier.summary()机器翻译示例
python
def create_translation_model():
"""
创建机器翻译模型
"""
# 模型参数
num_layers = 4
d_model = 128
dff = 512
num_heads = 8
input_vocab_size = 8500
target_vocab_size = 8000
# 创建Transformer
transformer = Transformer(
num_layers, d_model, num_heads, dff,
input_vocab_size, target_vocab_size,
pe_input=input_vocab_size,
pe_target=target_vocab_size
)
return transformer
# 训练步骤
@tf.function
def train_step(inp, tar, transformer, optimizer):
tar_inp = tar[:, :-1]
tar_real = tar[:, 1:]
enc_padding_mask, combined_mask, dec_padding_mask = create_masks(inp, tar_inp)
with tf.GradientTape() as tape:
predictions, _ = transformer(inp, tar_inp,
True,
enc_padding_mask,
combined_mask,
dec_padding_mask)
loss = loss_function(tar_real, predictions)
gradients = tape.gradient(loss, transformer.trainable_variables)
optimizer.apply_gradients(zip(gradients, transformer.trainable_variables))
train_loss(loss)
train_accuracy(tar_real, predictions)BERT风格的预训练模型
python
def create_bert_style_model(vocab_size, d_model=768, num_heads=12,
num_layers=12, dff=3072, max_seq_len=512):
"""
创建BERT风格的预训练模型
"""
inputs = keras.layers.Input(shape=(max_seq_len,))
# 词嵌入
embedding = keras.layers.Embedding(vocab_size, d_model)(inputs)
# 位置编码
pos_encoding = positional_encoding(max_seq_len, d_model)
embedding += pos_encoding[:, :max_seq_len, :]
# Transformer编码器层
x = embedding
for _ in range(num_layers):
# 多头自注意力
attn_output = keras.layers.MultiHeadAttention(
num_heads=num_heads, key_dim=d_model//num_heads
)(x, x)
# Dropout和残差连接
attn_output = keras.layers.Dropout(0.1)(attn_output)
x = keras.layers.LayerNormalization()(x + attn_output)
# 前馈网络
ffn_output = keras.layers.Dense(dff, activation='gelu')(x)
ffn_output = keras.layers.Dense(d_model)(ffn_output)
ffn_output = keras.layers.Dropout(0.1)(ffn_output)
# 残差连接和层归一化
x = keras.layers.LayerNormalization()(x + ffn_output)
# 用于不同任务的输出头
# MLM头(掩码语言模型)
mlm_output = keras.layers.Dense(vocab_size)(x)
# 分类头(用于句子级任务)
cls_output = keras.layers.Dense(2, activation='softmax')(x[:, 0, :])
model = keras.Model(inputs=inputs, outputs=[mlm_output, cls_output])
return model性能优化技巧
python
# 1. 使用混合精度训练
policy = keras.mixed_precision.Policy('mixed_float16')
keras.mixed_precision.set_global_policy(policy)
# 2. 梯度累积
def train_step_with_accumulation(inp, tar, transformer, optimizer, accumulation_steps=4):
accumulated_gradients = []
for i in range(accumulation_steps):
with tf.GradientTape() as tape:
# 前向传播
predictions, _ = transformer(inp[i], tar[i], True, None, None, None)
loss = loss_function(tar[i][:, 1:], predictions) / accumulation_steps
# 计算梯度
gradients = tape.gradient(loss, transformer.trainable_variables)
if i == 0:
accumulated_gradients = gradients
else:
accumulated_gradients = [acc_grad + grad for acc_grad, grad in
zip(accumulated_gradients, gradients)]
# 应用累积的梯度
optimizer.apply_gradients(zip(accumulated_gradients, transformer.trainable_variables))
# 3. 学习率预热
def warmup_cosine_decay(step, total_steps, warmup_steps, max_lr):
if step < warmup_steps:
return max_lr * step / warmup_steps
else:
progress = (step - warmup_steps) / (total_steps - warmup_steps)
return max_lr * 0.5 * (1 + tf.cos(np.pi * progress))注意力可视化
python
def plot_attention_weights(attention, sentence, result, layer):
"""
可视化注意力权重
"""
fig = plt.figure(figsize=(16, 8))
sentence = sentence[0]
attention = tf.squeeze(attention[layer], axis=0)
for head in range(attention.shape[0]):
ax = fig.add_subplot(2, 4, head+1)
# 绘制注意力权重
ax.matshow(attention[head][:-1, :], cmap='Blues')
fontdict = {'fontsize': 10}
ax.set_xticks(range(len(sentence)+2))
ax.set_yticks(range(len(result)))
ax.set_ylim(len(result)-1.5, -0.5)
ax.set_xticklabels(
['<start>']+[tokenizer_pt.decode([i]) for i in sentence]+['<end>'],
fontdict=fontdict, rotation=90)
ax.set_yticklabels([tokenizer_en.decode([i]) for i in result
if i < tokenizer_en.vocab_size],
fontdict=fontdict)
ax.set_xlabel('Head {}'.format(head+1))
plt.tight_layout()
plt.show()实际应用建议
1. 模型选择
- 小数据集:使用预训练模型微调
- 大数据集:从头训练或继续预训练
- 实时应用:考虑模型压缩和优化
2. 超参数调优
- 学习率:使用预热和衰减
- 批量大小:根据GPU内存调整
- 层数和维度:平衡性能和计算成本
3. 数据处理
- 适当的序列长度
- 数据增强技术
- 词汇表大小优化
总结
Transformer模型彻底改变了深度学习,特别是在NLP领域。其并行化能力、长距离依赖建模和可解释性使其成为现代AI系统的核心组件。理解Transformer的原理和实现对于深度学习从业者至关重要。
下一章我们将学习生成对抗网络(GAN),探索生成模型的另一个重要分支。