Skip to content

TensorFlow 训练与优化

训练基础概念

深度学习模型的训练是一个迭代优化过程,通过最小化损失函数来学习数据中的模式。理解训练过程的各个组件对于构建有效的模型至关重要。

python
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split

# 设置随机种子
tf.random.set_seed(42)
np.random.seed(42)

print(f"TensorFlow版本: {tf.__version__}")

损失函数

分类任务损失函数

python
# 二分类损失函数
def demonstrate_binary_losses():
    # 创建示例数据
    y_true = tf.constant([0, 1, 1, 0, 1], dtype=tf.float32)
    y_pred = tf.constant([0.1, 0.9, 0.8, 0.2, 0.7], dtype=tf.float32)
    
    # 二元交叉熵
    bce = tf.keras.losses.binary_crossentropy(y_true, y_pred)
    print(f"二元交叉熵: {bce}")
    
    # 带logits的二元交叉熵(数值更稳定)
    logits = tf.constant([-2.2, 2.2, 1.4, -1.4, 0.8])
    bce_logits = tf.keras.losses.binary_crossentropy(
        y_true, logits, from_logits=True
    )
    print(f"带logits的二元交叉熵: {bce_logits}")

demonstrate_binary_losses()

# 多分类损失函数
def demonstrate_multiclass_losses():
    # 稀疏分类交叉熵(标签为整数)
    y_true_sparse = tf.constant([0, 1, 2, 1, 0])
    y_pred_logits = tf.constant([
        [2.0, 0.5, 0.1],
        [0.1, 2.5, 0.2],
        [0.2, 0.3, 2.1],
        [0.8, 1.9, 0.4],
        [1.8, 0.6, 0.3]
    ])
    
    sparse_cce = tf.keras.losses.sparse_categorical_crossentropy(
        y_true_sparse, y_pred_logits, from_logits=True
    )
    print(f"稀疏分类交叉熵: {sparse_cce}")
    
    # 分类交叉熵(标签为one-hot)
    y_true_onehot = tf.one_hot(y_true_sparse, depth=3)
    cce = tf.keras.losses.categorical_crossentropy(
        y_true_onehot, y_pred_logits, from_logits=True
    )
    print(f"分类交叉熵: {cce}")

demonstrate_multiclass_losses()

回归任务损失函数

python
def demonstrate_regression_losses():
    y_true = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0])
    y_pred = tf.constant([1.1, 1.9, 3.2, 3.8, 5.1])
    
    # 均方误差
    mse = tf.keras.losses.mean_squared_error(y_true, y_pred)
    print(f"均方误差: {mse}")
    
    # 平均绝对误差
    mae = tf.keras.losses.mean_absolute_error(y_true, y_pred)
    print(f"平均绝对误差: {mae}")
    
    # Huber损失(对异常值更鲁棒)
    huber = tf.keras.losses.Huber(delta=1.0)(y_true, y_pred)
    print(f"Huber损失: {huber}")
    
    # 均方对数误差
    msle = tf.keras.losses.mean_squared_logarithmic_error(y_true, y_pred)
    print(f"均方对数误差: {msle}")

demonstrate_regression_losses()

自定义损失函数

python
def focal_loss(alpha=0.25, gamma=2.0):
    """Focal Loss用于处理类别不平衡"""
    def loss_function(y_true, y_pred):
        # 计算交叉熵
        ce = tf.keras.losses.binary_crossentropy(y_true, y_pred)
        
        # 计算p_t
        p_t = y_true * y_pred + (1 - y_true) * (1 - y_pred)
        
        # 计算alpha_t
        alpha_t = y_true * alpha + (1 - y_true) * (1 - alpha)
        
        # 计算focal loss
        focal_loss = alpha_t * tf.pow(1 - p_t, gamma) * ce
        
        return tf.reduce_mean(focal_loss)
    
    return loss_function

# 测试自定义损失函数
custom_focal = focal_loss(alpha=0.25, gamma=2.0)
y_true_test = tf.constant([0, 1, 1, 0, 1], dtype=tf.float32)
y_pred_test = tf.constant([0.1, 0.9, 0.8, 0.2, 0.7], dtype=tf.float32)
focal_result = custom_focal(y_true_test, y_pred_test)
print(f"Focal Loss: {focal_result}")

# 对比标准交叉熵
standard_bce = tf.keras.losses.binary_crossentropy(y_true_test, y_pred_test)
print(f"标准二元交叉熵: {tf.reduce_mean(standard_bce)}")

优化器

基础优化器

python
def compare_optimizers():
    """比较不同优化器的性能"""
    
    # 创建简单的二次函数进行优化
    def quadratic_function(x):
        return tf.reduce_sum(tf.square(x - 2.0))
    
    # 不同的优化器
    optimizers = {
        'SGD': tf.keras.optimizers.SGD(learning_rate=0.1),
        'SGD+Momentum': tf.keras.optimizers.SGD(learning_rate=0.1, momentum=0.9),
        'Adam': tf.keras.optimizers.Adam(learning_rate=0.1),
        'RMSprop': tf.keras.optimizers.RMSprop(learning_rate=0.1),
        'AdaGrad': tf.keras.optimizers.Adagrad(learning_rate=0.1)
    }
    
    results = {}
    
    for name, optimizer in optimizers.items():
        # 初始化变量
        x = tf.Variable([0.0, 0.0], dtype=tf.float32)
        
        # 记录优化过程
        history = []
        
        for step in range(50):
            with tf.GradientTape() as tape:
                loss = quadratic_function(x)
            
            gradients = tape.gradient(loss, [x])
            optimizer.apply_gradients(zip(gradients, [x]))
            
            history.append(loss.numpy())
        
        results[name] = history
    
    # 可视化优化过程
    plt.figure(figsize=(12, 8))
    
    for name, history in results.items():
        plt.plot(history, label=name, linewidth=2)
    
    plt.xlabel('迭代次数')
    plt.ylabel('损失值')
    plt.title('不同优化器的收敛过程')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.yscale('log')
    plt.show()
    
    return results

optimizer_results = compare_optimizers()

学习率调度

python
def demonstrate_learning_rate_schedules():
    """演示不同的学习率调度策略"""
    
    # 指数衰减
    exponential_decay = tf.keras.optimizers.schedules.ExponentialDecay(
        initial_learning_rate=0.1,
        decay_steps=100,
        decay_rate=0.96,
        staircase=True
    )
    
    # 多项式衰减
    polynomial_decay = tf.keras.optimizers.schedules.PolynomialDecay(
        initial_learning_rate=0.1,
        decay_steps=1000,
        end_learning_rate=0.01,
        power=0.5
    )
    
    # 分段常数
    piecewise_constant = tf.keras.optimizers.schedules.PiecewiseConstantDecay(
        boundaries=[100, 200, 300],
        values=[0.1, 0.05, 0.01, 0.005]
    )
    
    # 余弦衰减
    cosine_decay = tf.keras.optimizers.schedules.CosineDecay(
        initial_learning_rate=0.1,
        decay_steps=1000
    )
    
    # 可视化学习率变化
    steps = range(500)
    
    plt.figure(figsize=(15, 10))
    
    schedules = {
        '指数衰减': exponential_decay,
        '多项式衰减': polynomial_decay,
        '分段常数': piecewise_constant,
        '余弦衰减': cosine_decay
    }
    
    for i, (name, schedule) in enumerate(schedules.items(), 1):
        plt.subplot(2, 2, i)
        lr_values = [schedule(step).numpy() for step in steps]
        plt.plot(steps, lr_values, linewidth=2)
        plt.title(f'{name}学习率调度')
        plt.xlabel('步数')
        plt.ylabel('学习率')
        plt.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

demonstrate_learning_rate_schedules()

# 自定义学习率调度
class WarmupCosineDecay(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, warmup_steps, total_steps, initial_learning_rate, min_learning_rate=0.0):
        super(WarmupCosineDecay, self).__init__()
        self.warmup_steps = warmup_steps
        self.total_steps = total_steps
        self.initial_learning_rate = initial_learning_rate
        self.min_learning_rate = min_learning_rate
    
    def __call__(self, step):
        # Warmup阶段
        warmup_lr = self.initial_learning_rate * step / self.warmup_steps
        
        # 余弦衰减阶段
        cosine_lr = self.min_learning_rate + (self.initial_learning_rate - self.min_learning_rate) * \
                   0.5 * (1 + tf.cos(tf.constant(np.pi) * (step - self.warmup_steps) / (self.total_steps - self.warmup_steps)))
        
        return tf.cond(step < self.warmup_steps, lambda: warmup_lr, lambda: cosine_lr)

# 测试自定义调度器
custom_schedule = WarmupCosineDecay(
    warmup_steps=100,
    total_steps=1000,
    initial_learning_rate=0.001,
    min_learning_rate=0.0001
)

steps = range(1000)
custom_lr_values = [custom_schedule(step).numpy() for step in steps]

plt.figure(figsize=(10, 6))
plt.plot(steps, custom_lr_values, linewidth=2, color='red')
plt.title('自定义Warmup + 余弦衰减学习率调度')
plt.xlabel('步数')
plt.ylabel('学习率')
plt.grid(True, alpha=0.3)
plt.show()

训练循环

基本训练循环

python
def basic_training_loop():
    """演示基本的训练循环"""
    
    # 创建数据
    X, y = make_classification(n_samples=1000, n_features=20, n_classes=2, random_state=42)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    # 转换为TensorFlow张量
    X_train = tf.constant(X_train, dtype=tf.float32)
    y_train = tf.constant(y_train, dtype=tf.float32)
    X_test = tf.constant(X_test, dtype=tf.float32)
    y_test = tf.constant(y_test, dtype=tf.float32)
    
    # 创建模型
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(64, activation='relu', input_shape=(20,)),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(32, activation='relu'),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])
    
    # 定义优化器和损失函数
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
    loss_fn = tf.keras.losses.BinaryCrossentropy()
    
    # 训练参数
    epochs = 100
    batch_size = 32
    
    # 记录训练过程
    train_losses = []
    train_accuracies = []
    val_losses = []
    val_accuracies = []
    
    # 创建数据集
    train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
    train_dataset = train_dataset.shuffle(1000).batch(batch_size)
    
    print("开始训练...")
    
    for epoch in range(epochs):
        # 训练阶段
        epoch_loss = 0
        epoch_accuracy = 0
        num_batches = 0
        
        for batch_x, batch_y in train_dataset:
            with tf.GradientTape() as tape:
                predictions = model(batch_x, training=True)
                loss = loss_fn(batch_y, predictions)
            
            # 计算梯度并更新参数
            gradients = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(gradients, model.trainable_variables))
            
            # 计算准确率
            predicted_classes = tf.cast(predictions > 0.5, tf.float32)
            accuracy = tf.reduce_mean(tf.cast(tf.equal(predicted_classes, tf.expand_dims(batch_y, 1)), tf.float32))
            
            epoch_loss += loss
            epoch_accuracy += accuracy
            num_batches += 1
        
        # 计算平均值
        avg_train_loss = epoch_loss / num_batches
        avg_train_accuracy = epoch_accuracy / num_batches
        
        # 验证阶段
        val_predictions = model(X_test, training=False)
        val_loss = loss_fn(y_test, val_predictions)
        val_predicted_classes = tf.cast(val_predictions > 0.5, tf.float32)
        val_accuracy = tf.reduce_mean(tf.cast(tf.equal(val_predicted_classes, tf.expand_dims(y_test, 1)), tf.float32))
        
        # 记录结果
        train_losses.append(avg_train_loss.numpy())
        train_accuracies.append(avg_train_accuracy.numpy())
        val_losses.append(val_loss.numpy())
        val_accuracies.append(val_accuracy.numpy())
        
        # 打印进度
        if epoch % 10 == 0:
            print(f"Epoch {epoch}: Train Loss: {avg_train_loss:.4f}, Train Acc: {avg_train_accuracy:.4f}, "
                  f"Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.4f}")
    
    # 可视化训练过程
    plt.figure(figsize=(15, 5))
    
    plt.subplot(1, 3, 1)
    plt.plot(train_losses, label='训练损失', linewidth=2)
    plt.plot(val_losses, label='验证损失', linewidth=2)
    plt.title('损失变化')
    plt.xlabel('Epoch')
    plt.ylabel('损失')
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    plt.subplot(1, 3, 2)
    plt.plot(train_accuracies, label='训练准确率', linewidth=2)
    plt.plot(val_accuracies, label='验证准确率', linewidth=2)
    plt.title('准确率变化')
    plt.xlabel('Epoch')
    plt.ylabel('准确率')
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    plt.subplot(1, 3, 3)
    plt.plot(np.array(train_losses) - np.array(val_losses), linewidth=2, color='red')
    plt.title('过拟合监控 (训练损失 - 验证损失)')
    plt.xlabel('Epoch')
    plt.ylabel('损失差值')
    plt.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    return model, (train_losses, train_accuracies, val_losses, val_accuracies)

trained_model, training_history = basic_training_loop()

高级训练技巧

python
class AdvancedTrainer:
    def __init__(self, model, optimizer, loss_fn):
        self.model = model
        self.optimizer = optimizer
        self.loss_fn = loss_fn
        
        # 训练指标
        self.train_loss = tf.keras.metrics.Mean()
        self.train_accuracy = tf.keras.metrics.BinaryAccuracy()
        self.val_loss = tf.keras.metrics.Mean()
        self.val_accuracy = tf.keras.metrics.BinaryAccuracy()
    
    @tf.function
    def train_step(self, x, y):
        """单步训练"""
        with tf.GradientTape() as tape:
            predictions = self.model(x, training=True)
            loss = self.loss_fn(y, predictions)
        
        gradients = tape.gradient(loss, self.model.trainable_variables)
        
        # 梯度裁剪
        gradients = [tf.clip_by_norm(grad, 1.0) for grad in gradients]
        
        self.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))
        
        self.train_loss.update_state(loss)
        self.train_accuracy.update_state(y, predictions)
        
        return loss
    
    @tf.function
    def val_step(self, x, y):
        """单步验证"""
        predictions = self.model(x, training=False)
        loss = self.loss_fn(y, predictions)
        
        self.val_loss.update_state(loss)
        self.val_accuracy.update_state(y, predictions)
        
        return loss
    
    def train(self, train_dataset, val_dataset, epochs, patience=10):
        """训练模型"""
        
        best_val_loss = float('inf')
        patience_counter = 0
        
        history = {
            'train_loss': [],
            'train_accuracy': [],
            'val_loss': [],
            'val_accuracy': []
        }
        
        for epoch in range(epochs):
            # 重置指标
            self.train_loss.reset_states()
            self.train_accuracy.reset_states()
            self.val_loss.reset_states()
            self.val_accuracy.reset_states()
            
            # 训练阶段
            for x_batch, y_batch in train_dataset:
                self.train_step(x_batch, y_batch)
            
            # 验证阶段
            for x_batch, y_batch in val_dataset:
                self.val_step(x_batch, y_batch)
            
            # 记录指标
            train_loss = self.train_loss.result()
            train_acc = self.train_accuracy.result()
            val_loss = self.val_loss.result()
            val_acc = self.val_accuracy.result()
            
            history['train_loss'].append(train_loss.numpy())
            history['train_accuracy'].append(train_acc.numpy())
            history['val_loss'].append(val_loss.numpy())
            history['val_accuracy'].append(val_acc.numpy())
            
            # 早停检查
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                patience_counter = 0
                # 保存最佳模型
                self.model.save_weights('best_model_weights.h5')
            else:
                patience_counter += 1
            
            # 打印进度
            if epoch % 10 == 0:
                print(f"Epoch {epoch}: Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, "
                      f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
            
            # 早停
            if patience_counter >= patience:
                print(f"早停在第 {epoch} 轮")
                break
        
        # 加载最佳模型
        self.model.load_weights('best_model_weights.h5')
        
        return history

# 使用高级训练器
def advanced_training_demo():
    # 创建数据
    X, y = make_classification(n_samples=2000, n_features=20, n_classes=2, random_state=42)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    # 标准化数据
    mean = np.mean(X_train, axis=0)
    std = np.std(X_train, axis=0)
    X_train = (X_train - mean) / std
    X_test = (X_test - mean) / std
    
    # 创建数据集
    train_dataset = tf.data.Dataset.from_tensor_slices((X_train.astype(np.float32), y_train.astype(np.float32)))
    train_dataset = train_dataset.shuffle(1000).batch(32).prefetch(tf.data.AUTOTUNE)
    
    val_dataset = tf.data.Dataset.from_tensor_slices((X_test.astype(np.float32), y_test.astype(np.float32)))
    val_dataset = val_dataset.batch(32).prefetch(tf.data.AUTOTUNE)
    
    # 创建模型
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(128, activation='relu', input_shape=(20,)),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Dropout(0.3),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])
    
    # 创建优化器和损失函数
    lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
        initial_learning_rate=0.001,
        decay_steps=100,
        decay_rate=0.96
    )
    optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)
    loss_fn = tf.keras.losses.BinaryCrossentropy()
    
    # 创建训练器
    trainer = AdvancedTrainer(model, optimizer, loss_fn)
    
    # 训练模型
    history = trainer.train(train_dataset, val_dataset, epochs=200, patience=15)
    
    return model, history

advanced_model, advanced_history = advanced_training_demo()

正则化技术

Dropout和批量归一化

python
def regularization_comparison():
    """比较不同正则化技术的效果"""
    
    # 创建容易过拟合的数据
    X, y = make_classification(n_samples=500, n_features=50, n_informative=10, 
                             n_redundant=40, n_classes=2, random_state=42)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
    
    # 标准化
    mean = np.mean(X_train, axis=0)
    std = np.std(X_train, axis=0)
    X_train = (X_train - mean) / std
    X_test = (X_test - mean) / std
    
    # 不同的模型配置
    models = {
        '无正则化': tf.keras.Sequential([
            tf.keras.layers.Dense(256, activation='relu', input_shape=(50,)),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(1, activation='sigmoid')
        ]),
        
        'Dropout': tf.keras.Sequential([
            tf.keras.layers.Dense(256, activation='relu', input_shape=(50,)),
            tf.keras.layers.Dropout(0.3),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.Dropout(0.2),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dropout(0.1),
            tf.keras.layers.Dense(1, activation='sigmoid')
        ]),
        
        '批量归一化': tf.keras.Sequential([
            tf.keras.layers.Dense(256, activation='relu', input_shape=(50,)),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Dense(1, activation='sigmoid')
        ]),
        
        'Dropout + 批量归一化': tf.keras.Sequential([
            tf.keras.layers.Dense(256, activation='relu', input_shape=(50,)),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Dropout(0.3),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Dropout(0.2),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Dropout(0.1),
            tf.keras.layers.Dense(1, activation='sigmoid')
        ])
    }
    
    results = {}
    
    plt.figure(figsize=(15, 10))
    
    for i, (name, model) in enumerate(models.items(), 1):
        # 编译模型
        model.compile(
            optimizer='adam',
            loss='binary_crossentropy',
            metrics=['accuracy']
        )
        
        # 训练模型
        history = model.fit(
            X_train, y_train,
            epochs=100,
            batch_size=32,
            validation_data=(X_test, y_test),
            verbose=0
        )
        
        results[name] = history.history
        
        # 绘制训练曲线
        plt.subplot(2, 2, i)
        plt.plot(history.history['loss'], label='训练损失', linewidth=2)
        plt.plot(history.history['val_loss'], label='验证损失', linewidth=2)
        plt.title(f'{name}')
        plt.xlabel('Epoch')
        plt.ylabel('损失')
        plt.legend()
        plt.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    # 比较最终性能
    print("最终验证准确率比较:")
    for name, history in results.items():
        final_val_acc = history['val_accuracy'][-1]
        print(f"{name}: {final_val_acc:.4f}")
    
    return results

regularization_results = regularization_comparison()

L1和L2正则化

python
def weight_regularization_demo():
    """演示权重正则化"""
    
    # 创建数据
    X, y = make_classification(n_samples=800, n_features=30, n_informative=5, 
                             n_redundant=25, n_classes=2, random_state=42)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    # 标准化
    X_train = (X_train - np.mean(X_train, axis=0)) / np.std(X_train, axis=0)
    X_test = (X_test - np.mean(X_test, axis=0)) / np.std(X_test, axis=0)
    
    # 不同正则化强度
    regularizers = {
        '无正则化': None,
        'L1 (0.01)': tf.keras.regularizers.l1(0.01),
        'L2 (0.01)': tf.keras.regularizers.l2(0.01),
        'L1+L2 (0.01)': tf.keras.regularizers.l1_l2(l1=0.01, l2=0.01)
    }
    
    results = {}
    
    for name, regularizer in regularizers.items():
        # 创建模型
        model = tf.keras.Sequential([
            tf.keras.layers.Dense(128, activation='relu', input_shape=(30,),
                                kernel_regularizer=regularizer),
            tf.keras.layers.Dense(64, activation='relu',
                                kernel_regularizer=regularizer),
            tf.keras.layers.Dense(1, activation='sigmoid')
        ])
        
        model.compile(
            optimizer='adam',
            loss='binary_crossentropy',
            metrics=['accuracy']
        )
        
        # 训练模型
        history = model.fit(
            X_train, y_train,
            epochs=150,
            batch_size=32,
            validation_data=(X_test, y_test),
            verbose=0
        )
        
        results[name] = {
            'history': history.history,
            'model': model
        }
    
    # 可视化结果
    plt.figure(figsize=(15, 10))
    
    # 训练曲线
    plt.subplot(2, 2, 1)
    for name, result in results.items():
        plt.plot(result['history']['loss'], label=f'{name} (训练)', linewidth=2)
    plt.title('训练损失')
    plt.xlabel('Epoch')
    plt.ylabel('损失')
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    plt.subplot(2, 2, 2)
    for name, result in results.items():
        plt.plot(result['history']['val_loss'], label=f'{name} (验证)', linewidth=2)
    plt.title('验证损失')
    plt.xlabel('Epoch')
    plt.ylabel('损失')
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    # 权重分布
    plt.subplot(2, 2, 3)
    for name, result in results.items():
        if name != '无正则化':
            weights = result['model'].layers[0].get_weights()[0].flatten()
            plt.hist(weights, bins=30, alpha=0.7, label=name, density=True)
    plt.title('第一层权重分布')
    plt.xlabel('权重值')
    plt.ylabel('密度')
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    # 权重范数
    plt.subplot(2, 2, 4)
    weight_norms = {}
    for name, result in results.items():
        weights = result['model'].layers[0].get_weights()[0]
        l1_norm = np.sum(np.abs(weights))
        l2_norm = np.sqrt(np.sum(weights**2))
        weight_norms[name] = {'L1': l1_norm, 'L2': l2_norm}
    
    names = list(weight_norms.keys())
    l1_norms = [weight_norms[name]['L1'] for name in names]
    l2_norms = [weight_norms[name]['L2'] for name in names]
    
    x = np.arange(len(names))
    width = 0.35
    
    plt.bar(x - width/2, l1_norms, width, label='L1范数', alpha=0.7)
    plt.bar(x + width/2, l2_norms, width, label='L2范数', alpha=0.7)
    plt.title('权重范数比较')
    plt.xlabel('模型')
    plt.ylabel('范数值')
    plt.xticks(x, names, rotation=45)
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    return results

weight_reg_results = weight_regularization_demo()

回调函数

内置回调函数

python
def demonstrate_callbacks():
    """演示各种回调函数的使用"""
    
    # 创建数据
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
    x_train = x_train.reshape(-1, 784).astype('float32') / 255.0
    x_test = x_test.reshape(-1, 784).astype('float32') / 255.0
    
    # 只使用部分数据进行演示
    x_train = x_train[:5000]
    y_train = y_train[:5000]
    x_test = x_test[:1000]
    y_test = y_test[:1000]
    
    # 创建模型
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(128, activation='relu', input_shape=(784,)),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(10, activation='softmax')
    ])
    
    model.compile(
        optimizer='adam',
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    
    # 定义回调函数
    callbacks = [
        # 早停
        tf.keras.callbacks.EarlyStopping(
            monitor='val_loss',
            patience=10,
            restore_best_weights=True,
            verbose=1
        ),
        
        # 学习率衰减
        tf.keras.callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=5,
            min_lr=1e-7,
            verbose=1
        ),
        
        # 模型检查点
        tf.keras.callbacks.ModelCheckpoint(
            filepath='best_model.h5',
            monitor='val_accuracy',
            save_best_only=True,
            verbose=1
        ),
        
        # TensorBoard日志
        tf.keras.callbacks.TensorBoard(
            log_dir='./logs',
            histogram_freq=1,
            write_graph=True,
            write_images=True
        ),
        
        # CSV日志
        tf.keras.callbacks.CSVLogger('training_log.csv'),
        
        # 学习率调度器
        tf.keras.callbacks.LearningRateScheduler(
            lambda epoch: 0.001 * 0.9 ** epoch,
            verbose=1
        )
    ]
    
    # 训练模型
    history = model.fit(
        x_train, y_train,
        epochs=50,
        batch_size=128,
        validation_data=(x_test, y_test),
        callbacks=callbacks,
        verbose=1
    )
    
    return model, history

# 自定义回调函数
class CustomCallback(tf.keras.callbacks.Callback):
    def __init__(self):
        super(CustomCallback, self).__init__()
        self.epoch_times = []
    
    def on_epoch_begin(self, epoch, logs=None):
        self.epoch_start_time = tf.timestamp()
    
    def on_epoch_end(self, epoch, logs=None):
        epoch_time = tf.timestamp() - self.epoch_start_time
        self.epoch_times.append(epoch_time.numpy())
        
        # 打印自定义信息
        if logs:
            print(f"Epoch {epoch + 1} 完成,用时: {epoch_time:.2f}秒")
            print(f"训练准确率: {logs.get('accuracy', 0):.4f}, 验证准确率: {logs.get('val_accuracy', 0):.4f}")
        
        # 自定义早停逻辑
        if logs and logs.get('val_accuracy', 0) > 0.95:
            print("验证准确率达到95%,提前停止训练")
            self.model.stop_training = True
    
    def on_train_end(self, logs=None):
        avg_epoch_time = np.mean(self.epoch_times)
        print(f"训练完成,平均每轮用时: {avg_epoch_time:.2f}秒")

# 使用自定义回调
def custom_callback_demo():
    # 简单模型和数据
    x_train = np.random.random((1000, 20))
    y_train = np.random.randint(2, size=(1000, 1))
    x_val = np.random.random((200, 20))
    y_val = np.random.randint(2, size=(200, 1))
    
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(64, activation='relu', input_shape=(20,)),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])
    
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    
    # 使用自定义回调
    custom_callback = CustomCallback()
    
    history = model.fit(
        x_train, y_train,
        epochs=20,
        validation_data=(x_val, y_val),
        callbacks=[custom_callback],
        verbose=0
    )
    
    return model, history, custom_callback

custom_model, custom_history, custom_cb = custom_callback_demo()

模型评估与监控

训练过程监控

python
class TrainingMonitor:
    def __init__(self):
        self.metrics = {
            'loss': [],
            'accuracy': [],
            'val_loss': [],
            'val_accuracy': [],
            'learning_rate': [],
            'gradient_norm': []
        }
    
    def update_metrics(self, logs, learning_rate, gradient_norm):
        """更新监控指标"""
        for key in ['loss', 'accuracy', 'val_loss', 'val_accuracy']:
            if key in logs:
                self.metrics[key].append(logs[key])
        
        self.metrics['learning_rate'].append(learning_rate)
        self.metrics['gradient_norm'].append(gradient_norm)
    
    def plot_metrics(self):
        """绘制监控指标"""
        fig, axes = plt.subplots(2, 3, figsize=(18, 12))
        
        # 损失
        axes[0, 0].plot(self.metrics['loss'], label='训练损失', linewidth=2)
        axes[0, 0].plot(self.metrics['val_loss'], label='验证损失', linewidth=2)
        axes[0, 0].set_title('损失变化')
        axes[0, 0].set_xlabel('Epoch')
        axes[0, 0].set_ylabel('损失')
        axes[0, 0].legend()
        axes[0, 0].grid(True, alpha=0.3)
        
        # 准确率
        axes[0, 1].plot(self.metrics['accuracy'], label='训练准确率', linewidth=2)
        axes[0, 1].plot(self.metrics['val_accuracy'], label='验证准确率', linewidth=2)
        axes[0, 1].set_title('准确率变化')
        axes[0, 1].set_xlabel('Epoch')
        axes[0, 1].set_ylabel('准确率')
        axes[0, 1].legend()
        axes[0, 1].grid(True, alpha=0.3)
        
        # 学习率
        axes[0, 2].plot(self.metrics['learning_rate'], linewidth=2, color='green')
        axes[0, 2].set_title('学习率变化')
        axes[0, 2].set_xlabel('Epoch')
        axes[0, 2].set_ylabel('学习率')
        axes[0, 2].grid(True, alpha=0.3)
        
        # 梯度范数
        axes[1, 0].plot(self.metrics['gradient_norm'], linewidth=2, color='red')
        axes[1, 0].set_title('梯度范数')
        axes[1, 0].set_xlabel('Epoch')
        axes[1, 0].set_ylabel('梯度范数')
        axes[1, 0].grid(True, alpha=0.3)
        
        # 过拟合检测
        if len(self.metrics['loss']) > 0 and len(self.metrics['val_loss']) > 0:
            overfitting = np.array(self.metrics['val_loss']) - np.array(self.metrics['loss'])
            axes[1, 1].plot(overfitting, linewidth=2, color='orange')
            axes[1, 1].set_title('过拟合监控 (验证损失 - 训练损失)')
            axes[1, 1].set_xlabel('Epoch')
            axes[1, 1].set_ylabel('损失差值')
            axes[1, 1].grid(True, alpha=0.3)
        
        # 训练稳定性
        if len(self.metrics['loss']) > 10:
            loss_smoothed = np.convolve(self.metrics['loss'], np.ones(5)/5, mode='valid')
            axes[1, 2].plot(self.metrics['loss'], alpha=0.3, label='原始', linewidth=1)
            axes[1, 2].plot(range(2, len(loss_smoothed)+2), loss_smoothed, 
                          label='平滑', linewidth=2, color='blue')
            axes[1, 2].set_title('训练稳定性')
            axes[1, 2].set_xlabel('Epoch')
            axes[1, 2].set_ylabel('损失')
            axes[1, 2].legend()
            axes[1, 2].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()

def monitored_training():
    """带监控的训练过程"""
    
    # 创建数据
    X, y = make_classification(n_samples=2000, n_features=20, n_classes=2, random_state=42)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    # 标准化
    X_train = (X_train - np.mean(X_train, axis=0)) / np.std(X_train, axis=0)
    X_test = (X_test - np.mean(X_test, axis=0)) / np.std(X_test, axis=0)
    
    # 创建模型
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(128, activation='relu', input_shape=(20,)),
        tf.keras.layers.Dropout(0.3),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])
    
    # 优化器和损失函数
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
    loss_fn = tf.keras.losses.BinaryCrossentropy()
    
    # 创建监控器
    monitor = TrainingMonitor()
    
    # 训练循环
    epochs = 100
    batch_size = 32
    
    train_dataset = tf.data.Dataset.from_tensor_slices((X_train.astype(np.float32), y_train.astype(np.float32)))
    train_dataset = train_dataset.shuffle(1000).batch(batch_size)
    
    for epoch in range(epochs):
        # 训练阶段
        epoch_loss = 0
        epoch_accuracy = 0
        num_batches = 0
        total_gradient_norm = 0
        
        for batch_x, batch_y in train_dataset:
            with tf.GradientTape() as tape:
                predictions = model(batch_x, training=True)
                loss = loss_fn(batch_y, predictions)
            
            gradients = tape.gradient(loss, model.trainable_variables)
            
            # 计算梯度范数
            gradient_norm = tf.linalg.global_norm(gradients)
            total_gradient_norm += gradient_norm
            
            optimizer.apply_gradients(zip(gradients, model.trainable_variables))
            
            # 计算准确率
            predicted_classes = tf.cast(predictions > 0.5, tf.float32)
            accuracy = tf.reduce_mean(tf.cast(tf.equal(predicted_classes, tf.expand_dims(batch_y, 1)), tf.float32))
            
            epoch_loss += loss
            epoch_accuracy += accuracy
            num_batches += 1
        
        # 验证阶段
        val_predictions = model(X_test, training=False)
        val_loss = loss_fn(y_test, val_predictions)
        val_predicted_classes = tf.cast(val_predictions > 0.5, tf.float32)
        val_accuracy = tf.reduce_mean(tf.cast(tf.equal(val_predicted_classes, tf.expand_dims(y_test, 1)), tf.float32))
        
        # 更新监控指标
        logs = {
            'loss': (epoch_loss / num_batches).numpy(),
            'accuracy': (epoch_accuracy / num_batches).numpy(),
            'val_loss': val_loss.numpy(),
            'val_accuracy': val_accuracy.numpy()
        }
        
        current_lr = optimizer.learning_rate.numpy()
        avg_gradient_norm = (total_gradient_norm / num_batches).numpy()
        
        monitor.update_metrics(logs, current_lr, avg_gradient_norm)
        
        # 打印进度
        if epoch % 10 == 0:
            print(f"Epoch {epoch}: Train Loss: {logs['loss']:.4f}, Train Acc: {logs['accuracy']:.4f}, "
                  f"Val Loss: {logs['val_loss']:.4f}, Val Acc: {logs['val_accuracy']:.4f}, "
                  f"Grad Norm: {avg_gradient_norm:.4f}")
    
    # 显示监控结果
    monitor.plot_metrics()
    
    return model, monitor

trained_model_monitored, training_monitor = monitored_training()

总结

TensorFlow训练与优化涵盖了深度学习的核心技术:

关键要点:

  1. 损失函数选择:根据任务类型选择合适的损失函数
  2. 优化器配置:理解不同优化器的特点和适用场景
  3. 学习率调度:动态调整学习率提升训练效果
  4. 正则化技术:防止过拟合,提高模型泛化能力
  5. 训练监控:实时监控训练过程,及时发现问题

最佳实践:

  • 使用适当的数据预处理和增强
  • 实施早停和模型检查点
  • 监控梯度范数防止梯度爆炸/消失
  • 使用验证集进行超参数调优
  • 可视化训练过程便于调试

掌握这些训练技巧将帮助你构建更稳定、更高效的深度学习模型!

本站内容仅供学习和研究使用。