Skip to content

Transformer模型

Transformer是一种基于注意力机制的神经网络架构,由Vaswani等人在2017年的论文"Attention Is All You Need"中提出。它彻底改变了自然语言处理领域,成为现代NLP的基石。

Transformer基础概念

什么是Transformer?

Transformer完全基于注意力机制,摒弃了传统的循环和卷积结构。它能够并行处理序列中的所有位置,大大提高了训练效率,同时在长距离依赖建模方面表现出色。

核心组件

python
import tensorflow as tf
from tensorflow import keras
import numpy as np
import matplotlib.pyplot as plt

# Transformer的核心组件
class MultiHeadAttention(keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        
        assert d_model % self.num_heads == 0
        
        self.depth = d_model // self.num_heads
        
        self.wq = keras.layers.Dense(d_model)
        self.wk = keras.layers.Dense(d_model)
        self.wv = keras.layers.Dense(d_model)
        
        self.dense = keras.layers.Dense(d_model)
        
    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])
    
    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)[0]
        
        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)
        
        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)
        
        scaled_attention, attention_weights = scaled_dot_product_attention(
            q, k, v, mask)
        
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
        
        concat_attention = tf.reshape(scaled_attention, 
                                    (batch_size, -1, self.d_model))
        
        output = self.dense(concat_attention)
        
        return output, attention_weights

def scaled_dot_product_attention(q, k, v, mask):
    """计算注意力权重"""
    matmul_qk = tf.matmul(q, k, transpose_b=True)
    
    # 缩放
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
    
    # 添加mask
    if mask is not None:
        scaled_attention_logits += (mask * -1e9)
    
    # softmax
    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
    
    output = tf.matmul(attention_weights, v)
    
    return output, attention_weights

位置编码

python
def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i//2)) / np.float32(d_model))
    return pos * angle_rates

def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, np.newaxis],
                          np.arange(d_model)[np.newaxis, :],
                          d_model)
    
    # 对偶数索引应用sin
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    
    # 对奇数索引应用cos
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    
    pos_encoding = angle_rads[np.newaxis, ...]
    
    return tf.cast(pos_encoding, dtype=tf.float32)

# 可视化位置编码
def plot_positional_encoding(pos_encoding):
    plt.figure(figsize=(15, 5))
    plt.pcolormesh(pos_encoding[0], cmap='RdYlBu')
    plt.xlabel('Depth')
    plt.xlim((0, 512))
    plt.ylabel('Position')
    plt.colorbar()
    plt.title('Positional Encoding')
    plt.show()

# 示例
pos_encoding = positional_encoding(50, 512)
print(pos_encoding.shape)
# plot_positional_encoding(pos_encoding)

前馈网络

python
def point_wise_feed_forward_network(d_model, dff):
    return keras.Sequential([
        keras.layers.Dense(dff, activation='relu'),
        keras.layers.Dense(d_model)
    ])

# 示例
sample_ffn = point_wise_feed_forward_network(512, 2048)
sample_ffn(tf.random.uniform((64, 50, 512))).shape

Encoder层

python
class EncoderLayer(keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(EncoderLayer, self).__init__()
        
        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = point_wise_feed_forward_network(d_model, dff)
        
        self.layernorm1 = keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = keras.layers.LayerNormalization(epsilon=1e-6)
        
        self.dropout1 = keras.layers.Dropout(rate)
        self.dropout2 = keras.layers.Dropout(rate)
    
    def call(self, x, training, mask):
        attn_output, _ = self.mha(x, x, x, mask)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)
        
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        out2 = self.layernorm2(out1 + ffn_output)
        
        return out2

# 测试Encoder层
sample_encoder_layer = EncoderLayer(512, 8, 2048)
sample_encoder_layer_output = sample_encoder_layer(
    tf.random.uniform((64, 43, 512)), False, None)
print(sample_encoder_layer_output.shape)

Decoder层

python
class DecoderLayer(keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(DecoderLayer, self).__init__()
        
        self.mha1 = MultiHeadAttention(d_model, num_heads)
        self.mha2 = MultiHeadAttention(d_model, num_heads)
        
        self.ffn = point_wise_feed_forward_network(d_model, dff)
        
        self.layernorm1 = keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = keras.layers.LayerNormalization(epsilon=1e-6)
        
        self.dropout1 = keras.layers.Dropout(rate)
        self.dropout2 = keras.layers.Dropout(rate)
        self.dropout3 = keras.layers.Dropout(rate)
    
    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        # 自注意力
        attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask)
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(attn1 + x)
        
        # 编码器-解码器注意力
        attn2, attn_weights_block2 = self.mha2(
            enc_output, enc_output, out1, padding_mask)
        attn2 = self.dropout2(attn2, training=training)
        out2 = self.layernorm2(attn2 + out1)
        
        # 前馈网络
        ffn_output = self.ffn(out2)
        ffn_output = self.dropout3(ffn_output, training=training)
        out3 = self.layernorm3(ffn_output + out2)
        
        return out3, attn_weights_block1, attn_weights_block2

完整的Transformer模型

python
class Encoder(keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size,
                 maximum_position_encoding, rate=0.1):
        super(Encoder, self).__init__()
        
        self.d_model = d_model
        self.num_layers = num_layers
        
        self.embedding = keras.layers.Embedding(input_vocab_size, d_model)
        self.pos_encoding = positional_encoding(maximum_position_encoding, 
                                               self.d_model)
        
        self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate) 
                          for _ in range(num_layers)]
        
        self.dropout = keras.layers.Dropout(rate)
    
    def call(self, x, training, mask):
        seq_len = tf.shape(x)[1]
        
        # 词嵌入和位置编码
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]
        
        x = self.dropout(x, training=training)
        
        for i in range(self.num_layers):
            x = self.enc_layers[i](x, training, mask)
        
        return x

class Decoder(keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, target_vocab_size,
                 maximum_position_encoding, rate=0.1):
        super(Decoder, self).__init__()
        
        self.d_model = d_model
        self.num_layers = num_layers
        
        self.embedding = keras.layers.Embedding(target_vocab_size, d_model)
        self.pos_encoding = positional_encoding(maximum_position_encoding, d_model)
        
        self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate) 
                          for _ in range(num_layers)]
        self.dropout = keras.layers.Dropout(rate)
    
    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        seq_len = tf.shape(x)[1]
        attention_weights = {}
        
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]
        
        x = self.dropout(x, training=training)
        
        for i in range(self.num_layers):
            x, block1, block2 = self.dec_layers[i](x, enc_output, training,
                                                   look_ahead_mask, padding_mask)
            
            attention_weights[f'decoder_layer{i+1}_block1'] = block1
            attention_weights[f'decoder_layer{i+1}_block2'] = block2
        
        return x, attention_weights

class Transformer(keras.Model):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, 
                 target_vocab_size, pe_input, pe_target, rate=0.1):
        super(Transformer, self).__init__()
        
        self.encoder = Encoder(num_layers, d_model, num_heads, dff, 
                              input_vocab_size, pe_input, rate)
        
        self.decoder = Decoder(num_layers, d_model, num_heads, dff, 
                              target_vocab_size, pe_target, rate)
        
        self.final_layer = keras.layers.Dense(target_vocab_size)
    
    def call(self, inp, tar, training, enc_padding_mask, 
             look_ahead_mask, dec_padding_mask):
        
        enc_output = self.encoder(inp, training, enc_padding_mask)
        
        dec_output, attention_weights = self.decoder(
            tar, enc_output, training, look_ahead_mask, dec_padding_mask)
        
        final_output = self.final_layer(dec_output)
        
        return final_output, attention_weights

掩码机制

python
def create_padding_mask(seq):
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
    return seq[:, tf.newaxis, tf.newaxis, :]

def create_look_ahead_mask(size):
    mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
    return mask

def create_masks(inp, tar):
    # 编码器填充掩码
    enc_padding_mask = create_padding_mask(inp)
    
    # 解码器第二个注意力块的填充掩码
    dec_padding_mask = create_padding_mask(inp)
    
    # 解码器第一个注意力块的前瞻掩码
    look_ahead_mask = create_look_ahead_mask(tf.shape(tar)[1])
    dec_target_padding_mask = create_padding_mask(tar)
    combined_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)
    
    return enc_padding_mask, combined_mask, dec_padding_mask

训练设置

python
class CustomSchedule(keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, d_model, warmup_steps=4000):
        super(CustomSchedule, self).__init__()
        
        self.d_model = d_model
        self.d_model = tf.cast(self.d_model, tf.float32)
        
        self.warmup_steps = warmup_steps
    
    def __call__(self, step):
        arg1 = tf.math.rsqrt(step)
        arg2 = step * (self.warmup_steps ** -1.5)
        
        return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

# 学习率调度
learning_rate = CustomSchedule(512)

optimizer = keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98, 
                                 epsilon=1e-9)

# 损失函数
loss_object = keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)
    
    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask
    
    return tf.reduce_sum(loss_)/tf.reduce_sum(mask)

# 指标
train_loss = keras.metrics.Mean(name='train_loss')
train_accuracy = keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

文本分类示例

python
def create_transformer_classifier(vocab_size, d_model=128, num_heads=8, 
                                 num_layers=4, dff=512, max_seq_len=512, 
                                 num_classes=2):
    """
    创建用于文本分类的Transformer模型
    """
    inputs = keras.layers.Input(shape=(max_seq_len,))
    
    # 词嵌入
    embedding = keras.layers.Embedding(vocab_size, d_model)(inputs)
    
    # 位置编码
    pos_encoding = positional_encoding(max_seq_len, d_model)
    embedding += pos_encoding[:, :max_seq_len, :]
    
    # Transformer编码器层
    x = embedding
    for _ in range(num_layers):
        # 多头自注意力
        attn_output = keras.layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=d_model//num_heads
        )(x, x)
        
        # 残差连接和层归一化
        x = keras.layers.LayerNormalization()(x + attn_output)
        
        # 前馈网络
        ffn_output = keras.layers.Dense(dff, activation='relu')(x)
        ffn_output = keras.layers.Dense(d_model)(ffn_output)
        
        # 残差连接和层归一化
        x = keras.layers.LayerNormalization()(x + ffn_output)
    
    # 全局平均池化
    pooled = keras.layers.GlobalAveragePooling1D()(x)
    
    # 分类层
    outputs = keras.layers.Dense(num_classes, activation='softmax')(pooled)
    
    model = keras.Model(inputs=inputs, outputs=outputs)
    return model

# 创建分类模型
classifier = create_transformer_classifier(vocab_size=10000)
classifier.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

classifier.summary()

机器翻译示例

python
def create_translation_model():
    """
    创建机器翻译模型
    """
    # 模型参数
    num_layers = 4
    d_model = 128
    dff = 512
    num_heads = 8
    
    input_vocab_size = 8500
    target_vocab_size = 8000
    
    # 创建Transformer
    transformer = Transformer(
        num_layers, d_model, num_heads, dff,
        input_vocab_size, target_vocab_size, 
        pe_input=input_vocab_size, 
        pe_target=target_vocab_size
    )
    
    return transformer

# 训练步骤
@tf.function
def train_step(inp, tar, transformer, optimizer):
    tar_inp = tar[:, :-1]
    tar_real = tar[:, 1:]
    
    enc_padding_mask, combined_mask, dec_padding_mask = create_masks(inp, tar_inp)
    
    with tf.GradientTape() as tape:
        predictions, _ = transformer(inp, tar_inp, 
                                   True, 
                                   enc_padding_mask, 
                                   combined_mask, 
                                   dec_padding_mask)
        loss = loss_function(tar_real, predictions)
    
    gradients = tape.gradient(loss, transformer.trainable_variables)    
    optimizer.apply_gradients(zip(gradients, transformer.trainable_variables))
    
    train_loss(loss)
    train_accuracy(tar_real, predictions)

BERT风格的预训练模型

python
def create_bert_style_model(vocab_size, d_model=768, num_heads=12, 
                           num_layers=12, dff=3072, max_seq_len=512):
    """
    创建BERT风格的预训练模型
    """
    inputs = keras.layers.Input(shape=(max_seq_len,))
    
    # 词嵌入
    embedding = keras.layers.Embedding(vocab_size, d_model)(inputs)
    
    # 位置编码
    pos_encoding = positional_encoding(max_seq_len, d_model)
    embedding += pos_encoding[:, :max_seq_len, :]
    
    # Transformer编码器层
    x = embedding
    for _ in range(num_layers):
        # 多头自注意力
        attn_output = keras.layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=d_model//num_heads
        )(x, x)
        
        # Dropout和残差连接
        attn_output = keras.layers.Dropout(0.1)(attn_output)
        x = keras.layers.LayerNormalization()(x + attn_output)
        
        # 前馈网络
        ffn_output = keras.layers.Dense(dff, activation='gelu')(x)
        ffn_output = keras.layers.Dense(d_model)(ffn_output)
        ffn_output = keras.layers.Dropout(0.1)(ffn_output)
        
        # 残差连接和层归一化
        x = keras.layers.LayerNormalization()(x + ffn_output)
    
    # 用于不同任务的输出头
    # MLM头(掩码语言模型)
    mlm_output = keras.layers.Dense(vocab_size)(x)
    
    # 分类头(用于句子级任务)
    cls_output = keras.layers.Dense(2, activation='softmax')(x[:, 0, :])
    
    model = keras.Model(inputs=inputs, outputs=[mlm_output, cls_output])
    return model

性能优化技巧

python
# 1. 使用混合精度训练
policy = keras.mixed_precision.Policy('mixed_float16')
keras.mixed_precision.set_global_policy(policy)

# 2. 梯度累积
def train_step_with_accumulation(inp, tar, transformer, optimizer, accumulation_steps=4):
    accumulated_gradients = []
    
    for i in range(accumulation_steps):
        with tf.GradientTape() as tape:
            # 前向传播
            predictions, _ = transformer(inp[i], tar[i], True, None, None, None)
            loss = loss_function(tar[i][:, 1:], predictions) / accumulation_steps
        
        # 计算梯度
        gradients = tape.gradient(loss, transformer.trainable_variables)
        
        if i == 0:
            accumulated_gradients = gradients
        else:
            accumulated_gradients = [acc_grad + grad for acc_grad, grad in 
                                   zip(accumulated_gradients, gradients)]
    
    # 应用累积的梯度
    optimizer.apply_gradients(zip(accumulated_gradients, transformer.trainable_variables))

# 3. 学习率预热
def warmup_cosine_decay(step, total_steps, warmup_steps, max_lr):
    if step < warmup_steps:
        return max_lr * step / warmup_steps
    else:
        progress = (step - warmup_steps) / (total_steps - warmup_steps)
        return max_lr * 0.5 * (1 + tf.cos(np.pi * progress))

注意力可视化

python
def plot_attention_weights(attention, sentence, result, layer):
    """
    可视化注意力权重
    """
    fig = plt.figure(figsize=(16, 8))
    
    sentence = sentence[0]
    
    attention = tf.squeeze(attention[layer], axis=0)
    
    for head in range(attention.shape[0]):
        ax = fig.add_subplot(2, 4, head+1)
        
        # 绘制注意力权重
        ax.matshow(attention[head][:-1, :], cmap='Blues')
        
        fontdict = {'fontsize': 10}
        
        ax.set_xticks(range(len(sentence)+2))
        ax.set_yticks(range(len(result)))
        
        ax.set_ylim(len(result)-1.5, -0.5)
        
        ax.set_xticklabels(
            ['<start>']+[tokenizer_pt.decode([i]) for i in sentence]+['<end>'], 
            fontdict=fontdict, rotation=90)
        
        ax.set_yticklabels([tokenizer_en.decode([i]) for i in result 
                           if i < tokenizer_en.vocab_size], 
                          fontdict=fontdict)
        
        ax.set_xlabel('Head {}'.format(head+1))
    
    plt.tight_layout()
    plt.show()

实际应用建议

1. 模型选择

  • 小数据集:使用预训练模型微调
  • 大数据集:从头训练或继续预训练
  • 实时应用:考虑模型压缩和优化

2. 超参数调优

  • 学习率:使用预热和衰减
  • 批量大小:根据GPU内存调整
  • 层数和维度:平衡性能和计算成本

3. 数据处理

  • 适当的序列长度
  • 数据增强技术
  • 词汇表大小优化

总结

Transformer模型彻底改变了深度学习,特别是在NLP领域。其并行化能力、长距离依赖建模和可解释性使其成为现代AI系统的核心组件。理解Transformer的原理和实现对于深度学习从业者至关重要。

下一章我们将学习生成对抗网络(GAN),探索生成模型的另一个重要分支。

本站内容仅供学习和研究使用。