TensorFlow 训练与优化
训练基础概念
深度学习模型的训练是一个迭代优化过程,通过最小化损失函数来学习数据中的模式。理解训练过程的各个组件对于构建有效的模型至关重要。
python
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
# 设置随机种子
tf.random.set_seed(42)
np.random.seed(42)
print(f"TensorFlow版本: {tf.__version__}")损失函数
分类任务损失函数
python
# 二分类损失函数
def demonstrate_binary_losses():
# 创建示例数据
y_true = tf.constant([0, 1, 1, 0, 1], dtype=tf.float32)
y_pred = tf.constant([0.1, 0.9, 0.8, 0.2, 0.7], dtype=tf.float32)
# 二元交叉熵
bce = tf.keras.losses.binary_crossentropy(y_true, y_pred)
print(f"二元交叉熵: {bce}")
# 带logits的二元交叉熵(数值更稳定)
logits = tf.constant([-2.2, 2.2, 1.4, -1.4, 0.8])
bce_logits = tf.keras.losses.binary_crossentropy(
y_true, logits, from_logits=True
)
print(f"带logits的二元交叉熵: {bce_logits}")
demonstrate_binary_losses()
# 多分类损失函数
def demonstrate_multiclass_losses():
# 稀疏分类交叉熵(标签为整数)
y_true_sparse = tf.constant([0, 1, 2, 1, 0])
y_pred_logits = tf.constant([
[2.0, 0.5, 0.1],
[0.1, 2.5, 0.2],
[0.2, 0.3, 2.1],
[0.8, 1.9, 0.4],
[1.8, 0.6, 0.3]
])
sparse_cce = tf.keras.losses.sparse_categorical_crossentropy(
y_true_sparse, y_pred_logits, from_logits=True
)
print(f"稀疏分类交叉熵: {sparse_cce}")
# 分类交叉熵(标签为one-hot)
y_true_onehot = tf.one_hot(y_true_sparse, depth=3)
cce = tf.keras.losses.categorical_crossentropy(
y_true_onehot, y_pred_logits, from_logits=True
)
print(f"分类交叉熵: {cce}")
demonstrate_multiclass_losses()回归任务损失函数
python
def demonstrate_regression_losses():
y_true = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0])
y_pred = tf.constant([1.1, 1.9, 3.2, 3.8, 5.1])
# 均方误差
mse = tf.keras.losses.mean_squared_error(y_true, y_pred)
print(f"均方误差: {mse}")
# 平均绝对误差
mae = tf.keras.losses.mean_absolute_error(y_true, y_pred)
print(f"平均绝对误差: {mae}")
# Huber损失(对异常值更鲁棒)
huber = tf.keras.losses.Huber(delta=1.0)(y_true, y_pred)
print(f"Huber损失: {huber}")
# 均方对数误差
msle = tf.keras.losses.mean_squared_logarithmic_error(y_true, y_pred)
print(f"均方对数误差: {msle}")
demonstrate_regression_losses()自定义损失函数
python
def focal_loss(alpha=0.25, gamma=2.0):
"""Focal Loss用于处理类别不平衡"""
def loss_function(y_true, y_pred):
# 计算交叉熵
ce = tf.keras.losses.binary_crossentropy(y_true, y_pred)
# 计算p_t
p_t = y_true * y_pred + (1 - y_true) * (1 - y_pred)
# 计算alpha_t
alpha_t = y_true * alpha + (1 - y_true) * (1 - alpha)
# 计算focal loss
focal_loss = alpha_t * tf.pow(1 - p_t, gamma) * ce
return tf.reduce_mean(focal_loss)
return loss_function
# 测试自定义损失函数
custom_focal = focal_loss(alpha=0.25, gamma=2.0)
y_true_test = tf.constant([0, 1, 1, 0, 1], dtype=tf.float32)
y_pred_test = tf.constant([0.1, 0.9, 0.8, 0.2, 0.7], dtype=tf.float32)
focal_result = custom_focal(y_true_test, y_pred_test)
print(f"Focal Loss: {focal_result}")
# 对比标准交叉熵
standard_bce = tf.keras.losses.binary_crossentropy(y_true_test, y_pred_test)
print(f"标准二元交叉熵: {tf.reduce_mean(standard_bce)}")优化器
基础优化器
python
def compare_optimizers():
"""比较不同优化器的性能"""
# 创建简单的二次函数进行优化
def quadratic_function(x):
return tf.reduce_sum(tf.square(x - 2.0))
# 不同的优化器
optimizers = {
'SGD': tf.keras.optimizers.SGD(learning_rate=0.1),
'SGD+Momentum': tf.keras.optimizers.SGD(learning_rate=0.1, momentum=0.9),
'Adam': tf.keras.optimizers.Adam(learning_rate=0.1),
'RMSprop': tf.keras.optimizers.RMSprop(learning_rate=0.1),
'AdaGrad': tf.keras.optimizers.Adagrad(learning_rate=0.1)
}
results = {}
for name, optimizer in optimizers.items():
# 初始化变量
x = tf.Variable([0.0, 0.0], dtype=tf.float32)
# 记录优化过程
history = []
for step in range(50):
with tf.GradientTape() as tape:
loss = quadratic_function(x)
gradients = tape.gradient(loss, [x])
optimizer.apply_gradients(zip(gradients, [x]))
history.append(loss.numpy())
results[name] = history
# 可视化优化过程
plt.figure(figsize=(12, 8))
for name, history in results.items():
plt.plot(history, label=name, linewidth=2)
plt.xlabel('迭代次数')
plt.ylabel('损失值')
plt.title('不同优化器的收敛过程')
plt.legend()
plt.grid(True, alpha=0.3)
plt.yscale('log')
plt.show()
return results
optimizer_results = compare_optimizers()学习率调度
python
def demonstrate_learning_rate_schedules():
"""演示不同的学习率调度策略"""
# 指数衰减
exponential_decay = tf.keras.optimizers.schedules.ExponentialDecay(
initial_learning_rate=0.1,
decay_steps=100,
decay_rate=0.96,
staircase=True
)
# 多项式衰减
polynomial_decay = tf.keras.optimizers.schedules.PolynomialDecay(
initial_learning_rate=0.1,
decay_steps=1000,
end_learning_rate=0.01,
power=0.5
)
# 分段常数
piecewise_constant = tf.keras.optimizers.schedules.PiecewiseConstantDecay(
boundaries=[100, 200, 300],
values=[0.1, 0.05, 0.01, 0.005]
)
# 余弦衰减
cosine_decay = tf.keras.optimizers.schedules.CosineDecay(
initial_learning_rate=0.1,
decay_steps=1000
)
# 可视化学习率变化
steps = range(500)
plt.figure(figsize=(15, 10))
schedules = {
'指数衰减': exponential_decay,
'多项式衰减': polynomial_decay,
'分段常数': piecewise_constant,
'余弦衰减': cosine_decay
}
for i, (name, schedule) in enumerate(schedules.items(), 1):
plt.subplot(2, 2, i)
lr_values = [schedule(step).numpy() for step in steps]
plt.plot(steps, lr_values, linewidth=2)
plt.title(f'{name}学习率调度')
plt.xlabel('步数')
plt.ylabel('学习率')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
demonstrate_learning_rate_schedules()
# 自定义学习率调度
class WarmupCosineDecay(tf.keras.optimizers.schedules.LearningRateSchedule):
def __init__(self, warmup_steps, total_steps, initial_learning_rate, min_learning_rate=0.0):
super(WarmupCosineDecay, self).__init__()
self.warmup_steps = warmup_steps
self.total_steps = total_steps
self.initial_learning_rate = initial_learning_rate
self.min_learning_rate = min_learning_rate
def __call__(self, step):
# Warmup阶段
warmup_lr = self.initial_learning_rate * step / self.warmup_steps
# 余弦衰减阶段
cosine_lr = self.min_learning_rate + (self.initial_learning_rate - self.min_learning_rate) * \
0.5 * (1 + tf.cos(tf.constant(np.pi) * (step - self.warmup_steps) / (self.total_steps - self.warmup_steps)))
return tf.cond(step < self.warmup_steps, lambda: warmup_lr, lambda: cosine_lr)
# 测试自定义调度器
custom_schedule = WarmupCosineDecay(
warmup_steps=100,
total_steps=1000,
initial_learning_rate=0.001,
min_learning_rate=0.0001
)
steps = range(1000)
custom_lr_values = [custom_schedule(step).numpy() for step in steps]
plt.figure(figsize=(10, 6))
plt.plot(steps, custom_lr_values, linewidth=2, color='red')
plt.title('自定义Warmup + 余弦衰减学习率调度')
plt.xlabel('步数')
plt.ylabel('学习率')
plt.grid(True, alpha=0.3)
plt.show()训练循环
基本训练循环
python
def basic_training_loop():
"""演示基本的训练循环"""
# 创建数据
X, y = make_classification(n_samples=1000, n_features=20, n_classes=2, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 转换为TensorFlow张量
X_train = tf.constant(X_train, dtype=tf.float32)
y_train = tf.constant(y_train, dtype=tf.float32)
X_test = tf.constant(X_test, dtype=tf.float32)
y_test = tf.constant(y_test, dtype=tf.float32)
# 创建模型
model = tf.keras.Sequential([
tf.keras.layers.Dense(64, activation='relu', input_shape=(20,)),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(32, activation='relu'),
tf.keras.layers.Dense(1, activation='sigmoid')
])
# 定义优化器和损失函数
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
loss_fn = tf.keras.losses.BinaryCrossentropy()
# 训练参数
epochs = 100
batch_size = 32
# 记录训练过程
train_losses = []
train_accuracies = []
val_losses = []
val_accuracies = []
# 创建数据集
train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
train_dataset = train_dataset.shuffle(1000).batch(batch_size)
print("开始训练...")
for epoch in range(epochs):
# 训练阶段
epoch_loss = 0
epoch_accuracy = 0
num_batches = 0
for batch_x, batch_y in train_dataset:
with tf.GradientTape() as tape:
predictions = model(batch_x, training=True)
loss = loss_fn(batch_y, predictions)
# 计算梯度并更新参数
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
# 计算准确率
predicted_classes = tf.cast(predictions > 0.5, tf.float32)
accuracy = tf.reduce_mean(tf.cast(tf.equal(predicted_classes, tf.expand_dims(batch_y, 1)), tf.float32))
epoch_loss += loss
epoch_accuracy += accuracy
num_batches += 1
# 计算平均值
avg_train_loss = epoch_loss / num_batches
avg_train_accuracy = epoch_accuracy / num_batches
# 验证阶段
val_predictions = model(X_test, training=False)
val_loss = loss_fn(y_test, val_predictions)
val_predicted_classes = tf.cast(val_predictions > 0.5, tf.float32)
val_accuracy = tf.reduce_mean(tf.cast(tf.equal(val_predicted_classes, tf.expand_dims(y_test, 1)), tf.float32))
# 记录结果
train_losses.append(avg_train_loss.numpy())
train_accuracies.append(avg_train_accuracy.numpy())
val_losses.append(val_loss.numpy())
val_accuracies.append(val_accuracy.numpy())
# 打印进度
if epoch % 10 == 0:
print(f"Epoch {epoch}: Train Loss: {avg_train_loss:.4f}, Train Acc: {avg_train_accuracy:.4f}, "
f"Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.4f}")
# 可视化训练过程
plt.figure(figsize=(15, 5))
plt.subplot(1, 3, 1)
plt.plot(train_losses, label='训练损失', linewidth=2)
plt.plot(val_losses, label='验证损失', linewidth=2)
plt.title('损失变化')
plt.xlabel('Epoch')
plt.ylabel('损失')
plt.legend()
plt.grid(True, alpha=0.3)
plt.subplot(1, 3, 2)
plt.plot(train_accuracies, label='训练准确率', linewidth=2)
plt.plot(val_accuracies, label='验证准确率', linewidth=2)
plt.title('准确率变化')
plt.xlabel('Epoch')
plt.ylabel('准确率')
plt.legend()
plt.grid(True, alpha=0.3)
plt.subplot(1, 3, 3)
plt.plot(np.array(train_losses) - np.array(val_losses), linewidth=2, color='red')
plt.title('过拟合监控 (训练损失 - 验证损失)')
plt.xlabel('Epoch')
plt.ylabel('损失差值')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return model, (train_losses, train_accuracies, val_losses, val_accuracies)
trained_model, training_history = basic_training_loop()高级训练技巧
python
class AdvancedTrainer:
def __init__(self, model, optimizer, loss_fn):
self.model = model
self.optimizer = optimizer
self.loss_fn = loss_fn
# 训练指标
self.train_loss = tf.keras.metrics.Mean()
self.train_accuracy = tf.keras.metrics.BinaryAccuracy()
self.val_loss = tf.keras.metrics.Mean()
self.val_accuracy = tf.keras.metrics.BinaryAccuracy()
@tf.function
def train_step(self, x, y):
"""单步训练"""
with tf.GradientTape() as tape:
predictions = self.model(x, training=True)
loss = self.loss_fn(y, predictions)
gradients = tape.gradient(loss, self.model.trainable_variables)
# 梯度裁剪
gradients = [tf.clip_by_norm(grad, 1.0) for grad in gradients]
self.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))
self.train_loss.update_state(loss)
self.train_accuracy.update_state(y, predictions)
return loss
@tf.function
def val_step(self, x, y):
"""单步验证"""
predictions = self.model(x, training=False)
loss = self.loss_fn(y, predictions)
self.val_loss.update_state(loss)
self.val_accuracy.update_state(y, predictions)
return loss
def train(self, train_dataset, val_dataset, epochs, patience=10):
"""训练模型"""
best_val_loss = float('inf')
patience_counter = 0
history = {
'train_loss': [],
'train_accuracy': [],
'val_loss': [],
'val_accuracy': []
}
for epoch in range(epochs):
# 重置指标
self.train_loss.reset_states()
self.train_accuracy.reset_states()
self.val_loss.reset_states()
self.val_accuracy.reset_states()
# 训练阶段
for x_batch, y_batch in train_dataset:
self.train_step(x_batch, y_batch)
# 验证阶段
for x_batch, y_batch in val_dataset:
self.val_step(x_batch, y_batch)
# 记录指标
train_loss = self.train_loss.result()
train_acc = self.train_accuracy.result()
val_loss = self.val_loss.result()
val_acc = self.val_accuracy.result()
history['train_loss'].append(train_loss.numpy())
history['train_accuracy'].append(train_acc.numpy())
history['val_loss'].append(val_loss.numpy())
history['val_accuracy'].append(val_acc.numpy())
# 早停检查
if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
# 保存最佳模型
self.model.save_weights('best_model_weights.h5')
else:
patience_counter += 1
# 打印进度
if epoch % 10 == 0:
print(f"Epoch {epoch}: Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, "
f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
# 早停
if patience_counter >= patience:
print(f"早停在第 {epoch} 轮")
break
# 加载最佳模型
self.model.load_weights('best_model_weights.h5')
return history
# 使用高级训练器
def advanced_training_demo():
# 创建数据
X, y = make_classification(n_samples=2000, n_features=20, n_classes=2, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 标准化数据
mean = np.mean(X_train, axis=0)
std = np.std(X_train, axis=0)
X_train = (X_train - mean) / std
X_test = (X_test - mean) / std
# 创建数据集
train_dataset = tf.data.Dataset.from_tensor_slices((X_train.astype(np.float32), y_train.astype(np.float32)))
train_dataset = train_dataset.shuffle(1000).batch(32).prefetch(tf.data.AUTOTUNE)
val_dataset = tf.data.Dataset.from_tensor_slices((X_test.astype(np.float32), y_test.astype(np.float32)))
val_dataset = val_dataset.batch(32).prefetch(tf.data.AUTOTUNE)
# 创建模型
model = tf.keras.Sequential([
tf.keras.layers.Dense(128, activation='relu', input_shape=(20,)),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dropout(0.3),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(1, activation='sigmoid')
])
# 创建优化器和损失函数
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
initial_learning_rate=0.001,
decay_steps=100,
decay_rate=0.96
)
optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)
loss_fn = tf.keras.losses.BinaryCrossentropy()
# 创建训练器
trainer = AdvancedTrainer(model, optimizer, loss_fn)
# 训练模型
history = trainer.train(train_dataset, val_dataset, epochs=200, patience=15)
return model, history
advanced_model, advanced_history = advanced_training_demo()正则化技术
Dropout和批量归一化
python
def regularization_comparison():
"""比较不同正则化技术的效果"""
# 创建容易过拟合的数据
X, y = make_classification(n_samples=500, n_features=50, n_informative=10,
n_redundant=40, n_classes=2, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# 标准化
mean = np.mean(X_train, axis=0)
std = np.std(X_train, axis=0)
X_train = (X_train - mean) / std
X_test = (X_test - mean) / std
# 不同的模型配置
models = {
'无正则化': tf.keras.Sequential([
tf.keras.layers.Dense(256, activation='relu', input_shape=(50,)),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(1, activation='sigmoid')
]),
'Dropout': tf.keras.Sequential([
tf.keras.layers.Dense(256, activation='relu', input_shape=(50,)),
tf.keras.layers.Dropout(0.3),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dropout(0.1),
tf.keras.layers.Dense(1, activation='sigmoid')
]),
'批量归一化': tf.keras.Sequential([
tf.keras.layers.Dense(256, activation='relu', input_shape=(50,)),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dense(1, activation='sigmoid')
]),
'Dropout + 批量归一化': tf.keras.Sequential([
tf.keras.layers.Dense(256, activation='relu', input_shape=(50,)),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dropout(0.3),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.BatchNormalization(),
tf.keras.layers.Dropout(0.1),
tf.keras.layers.Dense(1, activation='sigmoid')
])
}
results = {}
plt.figure(figsize=(15, 10))
for i, (name, model) in enumerate(models.items(), 1):
# 编译模型
model.compile(
optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy']
)
# 训练模型
history = model.fit(
X_train, y_train,
epochs=100,
batch_size=32,
validation_data=(X_test, y_test),
verbose=0
)
results[name] = history.history
# 绘制训练曲线
plt.subplot(2, 2, i)
plt.plot(history.history['loss'], label='训练损失', linewidth=2)
plt.plot(history.history['val_loss'], label='验证损失', linewidth=2)
plt.title(f'{name}')
plt.xlabel('Epoch')
plt.ylabel('损失')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 比较最终性能
print("最终验证准确率比较:")
for name, history in results.items():
final_val_acc = history['val_accuracy'][-1]
print(f"{name}: {final_val_acc:.4f}")
return results
regularization_results = regularization_comparison()L1和L2正则化
python
def weight_regularization_demo():
"""演示权重正则化"""
# 创建数据
X, y = make_classification(n_samples=800, n_features=30, n_informative=5,
n_redundant=25, n_classes=2, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 标准化
X_train = (X_train - np.mean(X_train, axis=0)) / np.std(X_train, axis=0)
X_test = (X_test - np.mean(X_test, axis=0)) / np.std(X_test, axis=0)
# 不同正则化强度
regularizers = {
'无正则化': None,
'L1 (0.01)': tf.keras.regularizers.l1(0.01),
'L2 (0.01)': tf.keras.regularizers.l2(0.01),
'L1+L2 (0.01)': tf.keras.regularizers.l1_l2(l1=0.01, l2=0.01)
}
results = {}
for name, regularizer in regularizers.items():
# 创建模型
model = tf.keras.Sequential([
tf.keras.layers.Dense(128, activation='relu', input_shape=(30,),
kernel_regularizer=regularizer),
tf.keras.layers.Dense(64, activation='relu',
kernel_regularizer=regularizer),
tf.keras.layers.Dense(1, activation='sigmoid')
])
model.compile(
optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy']
)
# 训练模型
history = model.fit(
X_train, y_train,
epochs=150,
batch_size=32,
validation_data=(X_test, y_test),
verbose=0
)
results[name] = {
'history': history.history,
'model': model
}
# 可视化结果
plt.figure(figsize=(15, 10))
# 训练曲线
plt.subplot(2, 2, 1)
for name, result in results.items():
plt.plot(result['history']['loss'], label=f'{name} (训练)', linewidth=2)
plt.title('训练损失')
plt.xlabel('Epoch')
plt.ylabel('损失')
plt.legend()
plt.grid(True, alpha=0.3)
plt.subplot(2, 2, 2)
for name, result in results.items():
plt.plot(result['history']['val_loss'], label=f'{name} (验证)', linewidth=2)
plt.title('验证损失')
plt.xlabel('Epoch')
plt.ylabel('损失')
plt.legend()
plt.grid(True, alpha=0.3)
# 权重分布
plt.subplot(2, 2, 3)
for name, result in results.items():
if name != '无正则化':
weights = result['model'].layers[0].get_weights()[0].flatten()
plt.hist(weights, bins=30, alpha=0.7, label=name, density=True)
plt.title('第一层权重分布')
plt.xlabel('权重值')
plt.ylabel('密度')
plt.legend()
plt.grid(True, alpha=0.3)
# 权重范数
plt.subplot(2, 2, 4)
weight_norms = {}
for name, result in results.items():
weights = result['model'].layers[0].get_weights()[0]
l1_norm = np.sum(np.abs(weights))
l2_norm = np.sqrt(np.sum(weights**2))
weight_norms[name] = {'L1': l1_norm, 'L2': l2_norm}
names = list(weight_norms.keys())
l1_norms = [weight_norms[name]['L1'] for name in names]
l2_norms = [weight_norms[name]['L2'] for name in names]
x = np.arange(len(names))
width = 0.35
plt.bar(x - width/2, l1_norms, width, label='L1范数', alpha=0.7)
plt.bar(x + width/2, l2_norms, width, label='L2范数', alpha=0.7)
plt.title('权重范数比较')
plt.xlabel('模型')
plt.ylabel('范数值')
plt.xticks(x, names, rotation=45)
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return results
weight_reg_results = weight_regularization_demo()回调函数
内置回调函数
python
def demonstrate_callbacks():
"""演示各种回调函数的使用"""
# 创建数据
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape(-1, 784).astype('float32') / 255.0
x_test = x_test.reshape(-1, 784).astype('float32') / 255.0
# 只使用部分数据进行演示
x_train = x_train[:5000]
y_train = y_train[:5000]
x_test = x_test[:1000]
y_test = y_test[:1000]
# 创建模型
model = tf.keras.Sequential([
tf.keras.layers.Dense(128, activation='relu', input_shape=(784,)),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(10, activation='softmax')
])
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# 定义回调函数
callbacks = [
# 早停
tf.keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=10,
restore_best_weights=True,
verbose=1
),
# 学习率衰减
tf.keras.callbacks.ReduceLROnPlateau(
monitor='val_loss',
factor=0.5,
patience=5,
min_lr=1e-7,
verbose=1
),
# 模型检查点
tf.keras.callbacks.ModelCheckpoint(
filepath='best_model.h5',
monitor='val_accuracy',
save_best_only=True,
verbose=1
),
# TensorBoard日志
tf.keras.callbacks.TensorBoard(
log_dir='./logs',
histogram_freq=1,
write_graph=True,
write_images=True
),
# CSV日志
tf.keras.callbacks.CSVLogger('training_log.csv'),
# 学习率调度器
tf.keras.callbacks.LearningRateScheduler(
lambda epoch: 0.001 * 0.9 ** epoch,
verbose=1
)
]
# 训练模型
history = model.fit(
x_train, y_train,
epochs=50,
batch_size=128,
validation_data=(x_test, y_test),
callbacks=callbacks,
verbose=1
)
return model, history
# 自定义回调函数
class CustomCallback(tf.keras.callbacks.Callback):
def __init__(self):
super(CustomCallback, self).__init__()
self.epoch_times = []
def on_epoch_begin(self, epoch, logs=None):
self.epoch_start_time = tf.timestamp()
def on_epoch_end(self, epoch, logs=None):
epoch_time = tf.timestamp() - self.epoch_start_time
self.epoch_times.append(epoch_time.numpy())
# 打印自定义信息
if logs:
print(f"Epoch {epoch + 1} 完成,用时: {epoch_time:.2f}秒")
print(f"训练准确率: {logs.get('accuracy', 0):.4f}, 验证准确率: {logs.get('val_accuracy', 0):.4f}")
# 自定义早停逻辑
if logs and logs.get('val_accuracy', 0) > 0.95:
print("验证准确率达到95%,提前停止训练")
self.model.stop_training = True
def on_train_end(self, logs=None):
avg_epoch_time = np.mean(self.epoch_times)
print(f"训练完成,平均每轮用时: {avg_epoch_time:.2f}秒")
# 使用自定义回调
def custom_callback_demo():
# 简单模型和数据
x_train = np.random.random((1000, 20))
y_train = np.random.randint(2, size=(1000, 1))
x_val = np.random.random((200, 20))
y_val = np.random.randint(2, size=(200, 1))
model = tf.keras.Sequential([
tf.keras.layers.Dense(64, activation='relu', input_shape=(20,)),
tf.keras.layers.Dense(1, activation='sigmoid')
])
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
# 使用自定义回调
custom_callback = CustomCallback()
history = model.fit(
x_train, y_train,
epochs=20,
validation_data=(x_val, y_val),
callbacks=[custom_callback],
verbose=0
)
return model, history, custom_callback
custom_model, custom_history, custom_cb = custom_callback_demo()模型评估与监控
训练过程监控
python
class TrainingMonitor:
def __init__(self):
self.metrics = {
'loss': [],
'accuracy': [],
'val_loss': [],
'val_accuracy': [],
'learning_rate': [],
'gradient_norm': []
}
def update_metrics(self, logs, learning_rate, gradient_norm):
"""更新监控指标"""
for key in ['loss', 'accuracy', 'val_loss', 'val_accuracy']:
if key in logs:
self.metrics[key].append(logs[key])
self.metrics['learning_rate'].append(learning_rate)
self.metrics['gradient_norm'].append(gradient_norm)
def plot_metrics(self):
"""绘制监控指标"""
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
# 损失
axes[0, 0].plot(self.metrics['loss'], label='训练损失', linewidth=2)
axes[0, 0].plot(self.metrics['val_loss'], label='验证损失', linewidth=2)
axes[0, 0].set_title('损失变化')
axes[0, 0].set_xlabel('Epoch')
axes[0, 0].set_ylabel('损失')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 准确率
axes[0, 1].plot(self.metrics['accuracy'], label='训练准确率', linewidth=2)
axes[0, 1].plot(self.metrics['val_accuracy'], label='验证准确率', linewidth=2)
axes[0, 1].set_title('准确率变化')
axes[0, 1].set_xlabel('Epoch')
axes[0, 1].set_ylabel('准确率')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# 学习率
axes[0, 2].plot(self.metrics['learning_rate'], linewidth=2, color='green')
axes[0, 2].set_title('学习率变化')
axes[0, 2].set_xlabel('Epoch')
axes[0, 2].set_ylabel('学习率')
axes[0, 2].grid(True, alpha=0.3)
# 梯度范数
axes[1, 0].plot(self.metrics['gradient_norm'], linewidth=2, color='red')
axes[1, 0].set_title('梯度范数')
axes[1, 0].set_xlabel('Epoch')
axes[1, 0].set_ylabel('梯度范数')
axes[1, 0].grid(True, alpha=0.3)
# 过拟合检测
if len(self.metrics['loss']) > 0 and len(self.metrics['val_loss']) > 0:
overfitting = np.array(self.metrics['val_loss']) - np.array(self.metrics['loss'])
axes[1, 1].plot(overfitting, linewidth=2, color='orange')
axes[1, 1].set_title('过拟合监控 (验证损失 - 训练损失)')
axes[1, 1].set_xlabel('Epoch')
axes[1, 1].set_ylabel('损失差值')
axes[1, 1].grid(True, alpha=0.3)
# 训练稳定性
if len(self.metrics['loss']) > 10:
loss_smoothed = np.convolve(self.metrics['loss'], np.ones(5)/5, mode='valid')
axes[1, 2].plot(self.metrics['loss'], alpha=0.3, label='原始', linewidth=1)
axes[1, 2].plot(range(2, len(loss_smoothed)+2), loss_smoothed,
label='平滑', linewidth=2, color='blue')
axes[1, 2].set_title('训练稳定性')
axes[1, 2].set_xlabel('Epoch')
axes[1, 2].set_ylabel('损失')
axes[1, 2].legend()
axes[1, 2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def monitored_training():
"""带监控的训练过程"""
# 创建数据
X, y = make_classification(n_samples=2000, n_features=20, n_classes=2, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 标准化
X_train = (X_train - np.mean(X_train, axis=0)) / np.std(X_train, axis=0)
X_test = (X_test - np.mean(X_test, axis=0)) / np.std(X_test, axis=0)
# 创建模型
model = tf.keras.Sequential([
tf.keras.layers.Dense(128, activation='relu', input_shape=(20,)),
tf.keras.layers.Dropout(0.3),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(1, activation='sigmoid')
])
# 优化器和损失函数
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
loss_fn = tf.keras.losses.BinaryCrossentropy()
# 创建监控器
monitor = TrainingMonitor()
# 训练循环
epochs = 100
batch_size = 32
train_dataset = tf.data.Dataset.from_tensor_slices((X_train.astype(np.float32), y_train.astype(np.float32)))
train_dataset = train_dataset.shuffle(1000).batch(batch_size)
for epoch in range(epochs):
# 训练阶段
epoch_loss = 0
epoch_accuracy = 0
num_batches = 0
total_gradient_norm = 0
for batch_x, batch_y in train_dataset:
with tf.GradientTape() as tape:
predictions = model(batch_x, training=True)
loss = loss_fn(batch_y, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
# 计算梯度范数
gradient_norm = tf.linalg.global_norm(gradients)
total_gradient_norm += gradient_norm
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
# 计算准确率
predicted_classes = tf.cast(predictions > 0.5, tf.float32)
accuracy = tf.reduce_mean(tf.cast(tf.equal(predicted_classes, tf.expand_dims(batch_y, 1)), tf.float32))
epoch_loss += loss
epoch_accuracy += accuracy
num_batches += 1
# 验证阶段
val_predictions = model(X_test, training=False)
val_loss = loss_fn(y_test, val_predictions)
val_predicted_classes = tf.cast(val_predictions > 0.5, tf.float32)
val_accuracy = tf.reduce_mean(tf.cast(tf.equal(val_predicted_classes, tf.expand_dims(y_test, 1)), tf.float32))
# 更新监控指标
logs = {
'loss': (epoch_loss / num_batches).numpy(),
'accuracy': (epoch_accuracy / num_batches).numpy(),
'val_loss': val_loss.numpy(),
'val_accuracy': val_accuracy.numpy()
}
current_lr = optimizer.learning_rate.numpy()
avg_gradient_norm = (total_gradient_norm / num_batches).numpy()
monitor.update_metrics(logs, current_lr, avg_gradient_norm)
# 打印进度
if epoch % 10 == 0:
print(f"Epoch {epoch}: Train Loss: {logs['loss']:.4f}, Train Acc: {logs['accuracy']:.4f}, "
f"Val Loss: {logs['val_loss']:.4f}, Val Acc: {logs['val_accuracy']:.4f}, "
f"Grad Norm: {avg_gradient_norm:.4f}")
# 显示监控结果
monitor.plot_metrics()
return model, monitor
trained_model_monitored, training_monitor = monitored_training()总结
TensorFlow训练与优化涵盖了深度学习的核心技术:
关键要点:
- 损失函数选择:根据任务类型选择合适的损失函数
- 优化器配置:理解不同优化器的特点和适用场景
- 学习率调度:动态调整学习率提升训练效果
- 正则化技术:防止过拟合,提高模型泛化能力
- 训练监控:实时监控训练过程,及时发现问题
最佳实践:
- 使用适当的数据预处理和增强
- 实施早停和模型检查点
- 监控梯度范数防止梯度爆炸/消失
- 使用验证集进行超参数调优
- 可视化训练过程便于调试
掌握这些训练技巧将帮助你构建更稳定、更高效的深度学习模型!