PyTorch 损失函数与优化器
损失函数概述
损失函数(Loss Function)衡量模型预测与真实标签之间的差异,是训练神经网络的核心组件。PyTorch在torch.nn模块中提供了丰富的损失函数。
常用损失函数
1. 分类任务损失函数
交叉熵损失 (CrossEntropyLoss)
python
import torch
import torch.nn as nn
import torch.nn.functional as F
# 多分类交叉熵损失
criterion = nn.CrossEntropyLoss()
# 示例数据
logits = torch.randn(32, 10) # 批量大小32,10个类别
targets = torch.randint(0, 10, (32,)) # 真实标签
loss = criterion(logits, targets)
print(f"交叉熵损失: {loss.item():.4f}")
# 带权重的交叉熵(处理类别不平衡)
class_weights = torch.tensor([1.0, 2.0, 1.5, 1.0, 3.0, 1.0, 1.0, 2.0, 1.0, 1.0])
weighted_criterion = nn.CrossEntropyLoss(weight=class_weights)
weighted_loss = weighted_criterion(logits, targets)二分类交叉熵损失 (BCELoss)
python
# 二分类交叉熵损失
bce_criterion = nn.BCELoss()
# 需要先应用sigmoid
sigmoid_outputs = torch.sigmoid(torch.randn(32, 1))
binary_targets = torch.randint(0, 2, (32, 1)).float()
bce_loss = bce_criterion(sigmoid_outputs, binary_targets)
# BCEWithLogitsLoss(内置sigmoid,数值更稳定)
bce_logits_criterion = nn.BCEWithLogitsLoss()
raw_logits = torch.randn(32, 1)
bce_logits_loss = bce_logits_criterion(raw_logits, binary_targets)
print(f"BCE损失: {bce_loss.item():.4f}")
print(f"BCE with Logits损失: {bce_logits_loss.item():.4f}")负对数似然损失 (NLLLoss)
python
# 负对数似然损失(通常与LogSoftmax配合使用)
nll_criterion = nn.NLLLoss()
# 先应用log_softmax
log_probs = F.log_softmax(logits, dim=1)
nll_loss = nll_criterion(log_probs, targets)
print(f"NLL损失: {nll_loss.item():.4f}")2. 回归任务损失函数
均方误差损失 (MSELoss)
python
# 均方误差损失
mse_criterion = nn.MSELoss()
predictions = torch.randn(32, 1)
targets = torch.randn(32, 1)
mse_loss = mse_criterion(predictions, targets)
print(f"MSE损失: {mse_loss.item():.4f}")
# 平均绝对误差损失
mae_criterion = nn.L1Loss()
mae_loss = mae_criterion(predictions, targets)
print(f"MAE损失: {mae_loss.item():.4f}")Smooth L1 损失 (SmoothL1Loss)
python
# Smooth L1损失(Huber损失)
smooth_l1_criterion = nn.SmoothL1Loss()
smooth_l1_loss = smooth_l1_criterion(predictions, targets)
print(f"Smooth L1损失: {smooth_l1_loss.item():.4f}")3. 高级损失函数
Focal Loss(处理类别不平衡)
python
class FocalLoss(nn.Module):
def __init__(self, alpha=1, gamma=2, reduction='mean'):
super(FocalLoss, self).__init__()
self.alpha = alpha
self.gamma = gamma
self.reduction = reduction
def forward(self, inputs, targets):
ce_loss = F.cross_entropy(inputs, targets, reduction='none')
pt = torch.exp(-ce_loss)
focal_loss = self.alpha * (1 - pt) ** self.gamma * ce_loss
if self.reduction == 'mean':
return focal_loss.mean()
elif self.reduction == 'sum':
return focal_loss.sum()
else:
return focal_loss
# 使用Focal Loss
focal_criterion = FocalLoss(alpha=1, gamma=2)
focal_loss = focal_criterion(logits, targets)
print(f"Focal损失: {focal_loss.item():.4f}")Label Smoothing
python
class LabelSmoothingLoss(nn.Module):
def __init__(self, num_classes, smoothing=0.1):
super(LabelSmoothingLoss, self).__init__()
self.num_classes = num_classes
self.smoothing = smoothing
self.confidence = 1.0 - smoothing
def forward(self, pred, target):
pred = F.log_softmax(pred, dim=1)
true_dist = torch.zeros_like(pred)
true_dist.fill_(self.smoothing / (self.num_classes - 1))
true_dist.scatter_(1, target.data.unsqueeze(1), self.confidence)
return torch.mean(torch.sum(-true_dist * pred, dim=1))
# 使用Label Smoothing
label_smooth_criterion = LabelSmoothingLoss(num_classes=10, smoothing=0.1)
label_smooth_loss = label_smooth_criterion(logits, targets)
print(f"Label Smoothing损失: {label_smooth_loss.item():.4f}")优化器
1. 基础优化器
SGD (随机梯度下降)
python
import torch.optim as optim
# 创建模型
model = nn.Sequential(
nn.Linear(784, 128),
nn.ReLU(),
nn.Linear(128, 10)
)
# SGD优化器
sgd_optimizer = optim.SGD(
model.parameters(),
lr=0.01, # 学习率
momentum=0.9, # 动量
weight_decay=1e-4, # 权重衰减(L2正则化)
nesterov=True # Nesterov动量
)
print(f"SGD优化器: {sgd_optimizer}")Adam优化器
python
# Adam优化器
adam_optimizer = optim.Adam(
model.parameters(),
lr=0.001, # 学习率
betas=(0.9, 0.999), # 动量参数
eps=1e-8, # 数值稳定性参数
weight_decay=1e-4, # 权重衰减
amsgrad=False # 是否使用AMSGrad变体
)
# AdamW优化器(解耦权重衰减)
adamw_optimizer = optim.AdamW(
model.parameters(),
lr=0.001,
betas=(0.9, 0.999),
eps=1e-8,
weight_decay=0.01, # AdamW中的权重衰减更有效
amsgrad=False
)2. 高级优化器
RMSprop
python
rmsprop_optimizer = optim.RMSprop(
model.parameters(),
lr=0.01,
alpha=0.99, # 平滑常数
eps=1e-8,
weight_decay=0,
momentum=0,
centered=False
)AdaGrad
python
adagrad_optimizer = optim.Adagrad(
model.parameters(),
lr=0.01,
lr_decay=0,
weight_decay=0,
initial_accumulator_value=0,
eps=1e-10
)3. 自定义优化器
python
class CustomSGD(optim.Optimizer):
def __init__(self, params, lr=1e-3, momentum=0, dampening=0, weight_decay=0):
defaults = dict(lr=lr, momentum=momentum, dampening=dampening, weight_decay=weight_decay)
super(CustomSGD, self).__init__(params, defaults)
def step(self, closure=None):
loss = None
if closure is not None:
loss = closure()
for group in self.param_groups:
weight_decay = group['weight_decay']
momentum = group['momentum']
dampening = group['dampening']
for p in group['params']:
if p.grad is None:
continue
d_p = p.grad.data
if weight_decay != 0:
d_p.add_(p.data, alpha=weight_decay)
if momentum != 0:
param_state = self.state[p]
if len(param_state) == 0:
param_state['momentum_buffer'] = torch.zeros_like(p.data)
buf = param_state['momentum_buffer']
buf.mul_(momentum).add_(d_p, alpha=1 - dampening)
d_p = buf
p.data.add_(d_p, alpha=-group['lr'])
return loss学习率调度
1. 基础调度器
python
from torch.optim.lr_scheduler import *
# 步长衰减
step_scheduler = StepLR(
optimizer=adam_optimizer,
step_size=30, # 每30个epoch衰减一次
gamma=0.1 # 衰减因子
)
# 多步长衰减
multistep_scheduler = MultiStepLR(
optimizer=adam_optimizer,
milestones=[30, 60, 90], # 在这些epoch进行衰减
gamma=0.1
)
# 指数衰减
exp_scheduler = ExponentialLR(
optimizer=adam_optimizer,
gamma=0.95 # 每个epoch乘以0.95
)2. 自适应调度器
python
# 基于验证损失的调度器
plateau_scheduler = ReduceLROnPlateau(
optimizer=adam_optimizer,
mode='min', # 监控指标是否应该减小
factor=0.5, # 衰减因子
patience=10, # 等待epoch数
verbose=True, # 打印信息
threshold=0.0001, # 改善阈值
threshold_mode='rel',
cooldown=0, # 冷却期
min_lr=0, # 最小学习率
eps=1e-8
)
# 余弦退火调度器
cosine_scheduler = CosineAnnealingLR(
optimizer=adam_optimizer,
T_max=100, # 最大epoch数
eta_min=0 # 最小学习率
)
# 带重启的余弦退火
cosine_restart_scheduler = CosineAnnealingWarmRestarts(
optimizer=adam_optimizer,
T_0=10, # 第一次重启的epoch数
T_mult=2, # 重启周期的倍数
eta_min=0
)3. 自定义调度器
python
class WarmupLR(torch.optim.lr_scheduler._LRScheduler):
def __init__(self, optimizer, warmup_epochs, last_epoch=-1):
self.warmup_epochs = warmup_epochs
super(WarmupLR, self).__init__(optimizer, last_epoch)
def get_lr(self):
if self.last_epoch < self.warmup_epochs:
# 线性预热
return [base_lr * (self.last_epoch + 1) / self.warmup_epochs
for base_lr in self.base_lrs]
else:
return self.base_lrs
# 组合调度器
def create_scheduler(optimizer, warmup_epochs=5, total_epochs=100):
warmup_scheduler = WarmupLR(optimizer, warmup_epochs)
main_scheduler = CosineAnnealingLR(optimizer, T_max=total_epochs - warmup_epochs)
return warmup_scheduler, main_scheduler训练循环示例
1. 基本训练循环
python
def train_epoch(model, dataloader, criterion, optimizer, device):
model.train()
total_loss = 0
correct = 0
total = 0
for batch_idx, (data, target) in enumerate(dataloader):
data, target = data.to(device), target.to(device)
# 清零梯度
optimizer.zero_grad()
# 前向传播
output = model(data)
loss = criterion(output, target)
# 反向传播
loss.backward()
# 梯度裁剪(可选)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
# 更新参数
optimizer.step()
# 统计
total_loss += loss.item()
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
total += target.size(0)
avg_loss = total_loss / len(dataloader)
accuracy = 100. * correct / total
return avg_loss, accuracy
def validate(model, dataloader, criterion, device):
model.eval()
total_loss = 0
correct = 0
total = 0
with torch.no_grad():
for data, target in dataloader:
data, target = data.to(device), target.to(device)
output = model(data)
loss = criterion(output, target)
total_loss += loss.item()
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
total += target.size(0)
avg_loss = total_loss / len(dataloader)
accuracy = 100. * correct / total
return avg_loss, accuracy2. 完整训练流程
python
def train_model(model, train_loader, val_loader, num_epochs=100):
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
# 损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)
# 学习率调度器
scheduler = CosineAnnealingLR(optimizer, T_max=num_epochs)
# 训练历史
train_losses, train_accs = [], []
val_losses, val_accs = [], []
best_val_acc = 0
for epoch in range(num_epochs):
# 训练
train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
# 验证
val_loss, val_acc = validate(model, val_loader, criterion, device)
# 更新学习率
scheduler.step()
# 记录历史
train_losses.append(train_loss)
train_accs.append(train_acc)
val_losses.append(val_loss)
val_accs.append(val_acc)
# 保存最佳模型
if val_acc > best_val_acc:
best_val_acc = val_acc
torch.save(model.state_dict(), 'best_model.pth')
# 打印进度
if epoch % 10 == 0:
print(f'Epoch {epoch}/{num_epochs}:')
print(f' Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%')
print(f' Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%')
print(f' LR: {optimizer.param_groups[0]["lr"]:.6f}')
return train_losses, train_accs, val_losses, val_accs高级训练技巧
1. 梯度累积
python
def train_with_gradient_accumulation(model, dataloader, criterion, optimizer,
accumulation_steps=4):
model.train()
optimizer.zero_grad()
for batch_idx, (data, target) in enumerate(dataloader):
data, target = data.to(device), target.to(device)
# 前向传播
output = model(data)
loss = criterion(output, target)
# 缩放损失
loss = loss / accumulation_steps
# 反向传播
loss.backward()
# 每accumulation_steps步更新一次参数
if (batch_idx + 1) % accumulation_steps == 0:
optimizer.step()
optimizer.zero_grad()2. 混合精度训练
python
from torch.cuda.amp import GradScaler, autocast
def train_with_mixed_precision(model, dataloader, criterion, optimizer):
scaler = GradScaler()
model.train()
for data, target in dataloader:
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
# 使用autocast进行前向传播
with autocast():
output = model(data)
loss = criterion(output, target)
# 缩放损失并反向传播
scaler.scale(loss).backward()
# 更新参数
scaler.step(optimizer)
scaler.update()3. 早停机制
python
class EarlyStopping:
def __init__(self, patience=7, min_delta=0, restore_best_weights=True):
self.patience = patience
self.min_delta = min_delta
self.restore_best_weights = restore_best_weights
self.best_loss = None
self.counter = 0
self.best_weights = None
def __call__(self, val_loss, model):
if self.best_loss is None:
self.best_loss = val_loss
self.save_checkpoint(model)
elif val_loss < self.best_loss - self.min_delta:
self.best_loss = val_loss
self.counter = 0
self.save_checkpoint(model)
else:
self.counter += 1
if self.counter >= self.patience:
if self.restore_best_weights:
model.load_state_dict(self.best_weights)
return True
return False
def save_checkpoint(self, model):
self.best_weights = model.state_dict().copy()
# 使用早停
early_stopping = EarlyStopping(patience=10)
for epoch in range(num_epochs):
train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
val_loss, val_acc = validate(model, val_loader, criterion, device)
if early_stopping(val_loss, model):
print(f"Early stopping at epoch {epoch}")
break优化技巧总结
1. 选择合适的优化器
python
# 不同任务的优化器选择建议
def get_optimizer(model, task_type='classification'):
if task_type == 'classification':
return optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)
elif task_type == 'regression':
return optim.Adam(model.parameters(), lr=0.001)
elif task_type == 'gan':
return optim.Adam(model.parameters(), lr=0.0002, betas=(0.5, 0.999))
else:
return optim.SGD(model.parameters(), lr=0.01, momentum=0.9)2. 学习率设置
python
# 学习率查找器
class LRFinder:
def __init__(self, model, optimizer, criterion, device):
self.model = model
self.optimizer = optimizer
self.criterion = criterion
self.device = device
def find_lr(self, dataloader, start_lr=1e-7, end_lr=10, num_iter=100):
lrs = []
losses = []
lr = start_lr
self.optimizer.param_groups[0]['lr'] = lr
for i, (data, target) in enumerate(dataloader):
if i >= num_iter:
break
data, target = data.to(self.device), target.to(self.device)
self.optimizer.zero_grad()
output = self.model(data)
loss = self.criterion(output, target)
loss.backward()
self.optimizer.step()
lrs.append(lr)
losses.append(loss.item())
lr *= (end_lr / start_lr) ** (1 / num_iter)
self.optimizer.param_groups[0]['lr'] = lr
return lrs, losses总结
损失函数和优化器是深度学习训练的核心组件:
- 损失函数选择:根据任务类型选择合适的损失函数
- 优化器选择:了解不同优化器的特点和适用场景
- 学习率调度:使用合适的学习率调度策略
- 训练技巧:掌握梯度累积、混合精度、早停等技术
- 调试优化:学会诊断和解决训练中的问题
掌握这些知识将帮助你训练出更好的深度学习模型!