Skip to content

PyTorch 损失函数与优化器

损失函数概述

损失函数(Loss Function)衡量模型预测与真实标签之间的差异,是训练神经网络的核心组件。PyTorch在torch.nn模块中提供了丰富的损失函数。

常用损失函数

1. 分类任务损失函数

交叉熵损失 (CrossEntropyLoss)

python
import torch
import torch.nn as nn
import torch.nn.functional as F

# 多分类交叉熵损失
criterion = nn.CrossEntropyLoss()

# 示例数据
logits = torch.randn(32, 10)  # 批量大小32,10个类别
targets = torch.randint(0, 10, (32,))  # 真实标签

loss = criterion(logits, targets)
print(f"交叉熵损失: {loss.item():.4f}")

# 带权重的交叉熵(处理类别不平衡)
class_weights = torch.tensor([1.0, 2.0, 1.5, 1.0, 3.0, 1.0, 1.0, 2.0, 1.0, 1.0])
weighted_criterion = nn.CrossEntropyLoss(weight=class_weights)
weighted_loss = weighted_criterion(logits, targets)

二分类交叉熵损失 (BCELoss)

python
# 二分类交叉熵损失
bce_criterion = nn.BCELoss()

# 需要先应用sigmoid
sigmoid_outputs = torch.sigmoid(torch.randn(32, 1))
binary_targets = torch.randint(0, 2, (32, 1)).float()

bce_loss = bce_criterion(sigmoid_outputs, binary_targets)

# BCEWithLogitsLoss(内置sigmoid,数值更稳定)
bce_logits_criterion = nn.BCEWithLogitsLoss()
raw_logits = torch.randn(32, 1)
bce_logits_loss = bce_logits_criterion(raw_logits, binary_targets)

print(f"BCE损失: {bce_loss.item():.4f}")
print(f"BCE with Logits损失: {bce_logits_loss.item():.4f}")

负对数似然损失 (NLLLoss)

python
# 负对数似然损失(通常与LogSoftmax配合使用)
nll_criterion = nn.NLLLoss()

# 先应用log_softmax
log_probs = F.log_softmax(logits, dim=1)
nll_loss = nll_criterion(log_probs, targets)

print(f"NLL损失: {nll_loss.item():.4f}")

2. 回归任务损失函数

均方误差损失 (MSELoss)

python
# 均方误差损失
mse_criterion = nn.MSELoss()

predictions = torch.randn(32, 1)
targets = torch.randn(32, 1)

mse_loss = mse_criterion(predictions, targets)
print(f"MSE损失: {mse_loss.item():.4f}")

# 平均绝对误差损失
mae_criterion = nn.L1Loss()
mae_loss = mae_criterion(predictions, targets)
print(f"MAE损失: {mae_loss.item():.4f}")

Smooth L1 损失 (SmoothL1Loss)

python
# Smooth L1损失(Huber损失)
smooth_l1_criterion = nn.SmoothL1Loss()
smooth_l1_loss = smooth_l1_criterion(predictions, targets)

print(f"Smooth L1损失: {smooth_l1_loss.item():.4f}")

3. 高级损失函数

Focal Loss(处理类别不平衡)

python
class FocalLoss(nn.Module):
    def __init__(self, alpha=1, gamma=2, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction
    
    def forward(self, inputs, targets):
        ce_loss = F.cross_entropy(inputs, targets, reduction='none')
        pt = torch.exp(-ce_loss)
        focal_loss = self.alpha * (1 - pt) ** self.gamma * ce_loss
        
        if self.reduction == 'mean':
            return focal_loss.mean()
        elif self.reduction == 'sum':
            return focal_loss.sum()
        else:
            return focal_loss

# 使用Focal Loss
focal_criterion = FocalLoss(alpha=1, gamma=2)
focal_loss = focal_criterion(logits, targets)
print(f"Focal损失: {focal_loss.item():.4f}")

Label Smoothing

python
class LabelSmoothingLoss(nn.Module):
    def __init__(self, num_classes, smoothing=0.1):
        super(LabelSmoothingLoss, self).__init__()
        self.num_classes = num_classes
        self.smoothing = smoothing
        self.confidence = 1.0 - smoothing
    
    def forward(self, pred, target):
        pred = F.log_softmax(pred, dim=1)
        true_dist = torch.zeros_like(pred)
        true_dist.fill_(self.smoothing / (self.num_classes - 1))
        true_dist.scatter_(1, target.data.unsqueeze(1), self.confidence)
        
        return torch.mean(torch.sum(-true_dist * pred, dim=1))

# 使用Label Smoothing
label_smooth_criterion = LabelSmoothingLoss(num_classes=10, smoothing=0.1)
label_smooth_loss = label_smooth_criterion(logits, targets)
print(f"Label Smoothing损失: {label_smooth_loss.item():.4f}")

优化器

1. 基础优化器

SGD (随机梯度下降)

python
import torch.optim as optim

# 创建模型
model = nn.Sequential(
    nn.Linear(784, 128),
    nn.ReLU(),
    nn.Linear(128, 10)
)

# SGD优化器
sgd_optimizer = optim.SGD(
    model.parameters(),
    lr=0.01,           # 学习率
    momentum=0.9,      # 动量
    weight_decay=1e-4, # 权重衰减(L2正则化)
    nesterov=True      # Nesterov动量
)

print(f"SGD优化器: {sgd_optimizer}")

Adam优化器

python
# Adam优化器
adam_optimizer = optim.Adam(
    model.parameters(),
    lr=0.001,                    # 学习率
    betas=(0.9, 0.999),         # 动量参数
    eps=1e-8,                   # 数值稳定性参数
    weight_decay=1e-4,          # 权重衰减
    amsgrad=False               # 是否使用AMSGrad变体
)

# AdamW优化器(解耦权重衰减)
adamw_optimizer = optim.AdamW(
    model.parameters(),
    lr=0.001,
    betas=(0.9, 0.999),
    eps=1e-8,
    weight_decay=0.01,  # AdamW中的权重衰减更有效
    amsgrad=False
)

2. 高级优化器

RMSprop

python
rmsprop_optimizer = optim.RMSprop(
    model.parameters(),
    lr=0.01,
    alpha=0.99,        # 平滑常数
    eps=1e-8,
    weight_decay=0,
    momentum=0,
    centered=False
)

AdaGrad

python
adagrad_optimizer = optim.Adagrad(
    model.parameters(),
    lr=0.01,
    lr_decay=0,
    weight_decay=0,
    initial_accumulator_value=0,
    eps=1e-10
)

3. 自定义优化器

python
class CustomSGD(optim.Optimizer):
    def __init__(self, params, lr=1e-3, momentum=0, dampening=0, weight_decay=0):
        defaults = dict(lr=lr, momentum=momentum, dampening=dampening, weight_decay=weight_decay)
        super(CustomSGD, self).__init__(params, defaults)
    
    def step(self, closure=None):
        loss = None
        if closure is not None:
            loss = closure()
        
        for group in self.param_groups:
            weight_decay = group['weight_decay']
            momentum = group['momentum']
            dampening = group['dampening']
            
            for p in group['params']:
                if p.grad is None:
                    continue
                
                d_p = p.grad.data
                
                if weight_decay != 0:
                    d_p.add_(p.data, alpha=weight_decay)
                
                if momentum != 0:
                    param_state = self.state[p]
                    if len(param_state) == 0:
                        param_state['momentum_buffer'] = torch.zeros_like(p.data)
                    
                    buf = param_state['momentum_buffer']
                    buf.mul_(momentum).add_(d_p, alpha=1 - dampening)
                    d_p = buf
                
                p.data.add_(d_p, alpha=-group['lr'])
        
        return loss

学习率调度

1. 基础调度器

python
from torch.optim.lr_scheduler import *

# 步长衰减
step_scheduler = StepLR(
    optimizer=adam_optimizer,
    step_size=30,    # 每30个epoch衰减一次
    gamma=0.1        # 衰减因子
)

# 多步长衰减
multistep_scheduler = MultiStepLR(
    optimizer=adam_optimizer,
    milestones=[30, 60, 90],  # 在这些epoch进行衰减
    gamma=0.1
)

# 指数衰减
exp_scheduler = ExponentialLR(
    optimizer=adam_optimizer,
    gamma=0.95  # 每个epoch乘以0.95
)

2. 自适应调度器

python
# 基于验证损失的调度器
plateau_scheduler = ReduceLROnPlateau(
    optimizer=adam_optimizer,
    mode='min',        # 监控指标是否应该减小
    factor=0.5,        # 衰减因子
    patience=10,       # 等待epoch数
    verbose=True,      # 打印信息
    threshold=0.0001,  # 改善阈值
    threshold_mode='rel',
    cooldown=0,        # 冷却期
    min_lr=0,         # 最小学习率
    eps=1e-8
)

# 余弦退火调度器
cosine_scheduler = CosineAnnealingLR(
    optimizer=adam_optimizer,
    T_max=100,    # 最大epoch数
    eta_min=0     # 最小学习率
)

# 带重启的余弦退火
cosine_restart_scheduler = CosineAnnealingWarmRestarts(
    optimizer=adam_optimizer,
    T_0=10,       # 第一次重启的epoch数
    T_mult=2,     # 重启周期的倍数
    eta_min=0
)

3. 自定义调度器

python
class WarmupLR(torch.optim.lr_scheduler._LRScheduler):
    def __init__(self, optimizer, warmup_epochs, last_epoch=-1):
        self.warmup_epochs = warmup_epochs
        super(WarmupLR, self).__init__(optimizer, last_epoch)
    
    def get_lr(self):
        if self.last_epoch < self.warmup_epochs:
            # 线性预热
            return [base_lr * (self.last_epoch + 1) / self.warmup_epochs 
                    for base_lr in self.base_lrs]
        else:
            return self.base_lrs

# 组合调度器
def create_scheduler(optimizer, warmup_epochs=5, total_epochs=100):
    warmup_scheduler = WarmupLR(optimizer, warmup_epochs)
    main_scheduler = CosineAnnealingLR(optimizer, T_max=total_epochs - warmup_epochs)
    
    return warmup_scheduler, main_scheduler

训练循环示例

1. 基本训练循环

python
def train_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    correct = 0
    total = 0
    
    for batch_idx, (data, target) in enumerate(dataloader):
        data, target = data.to(device), target.to(device)
        
        # 清零梯度
        optimizer.zero_grad()
        
        # 前向传播
        output = model(data)
        loss = criterion(output, target)
        
        # 反向传播
        loss.backward()
        
        # 梯度裁剪(可选)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        # 更新参数
        optimizer.step()
        
        # 统计
        total_loss += loss.item()
        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()
        total += target.size(0)
    
    avg_loss = total_loss / len(dataloader)
    accuracy = 100. * correct / total
    
    return avg_loss, accuracy

def validate(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for data, target in dataloader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = criterion(output, target)
            
            total_loss += loss.item()
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
            total += target.size(0)
    
    avg_loss = total_loss / len(dataloader)
    accuracy = 100. * correct / total
    
    return avg_loss, accuracy

2. 完整训练流程

python
def train_model(model, train_loader, val_loader, num_epochs=100):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    
    # 损失函数和优化器
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)
    
    # 学习率调度器
    scheduler = CosineAnnealingLR(optimizer, T_max=num_epochs)
    
    # 训练历史
    train_losses, train_accs = [], []
    val_losses, val_accs = [], []
    
    best_val_acc = 0
    
    for epoch in range(num_epochs):
        # 训练
        train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
        
        # 验证
        val_loss, val_acc = validate(model, val_loader, criterion, device)
        
        # 更新学习率
        scheduler.step()
        
        # 记录历史
        train_losses.append(train_loss)
        train_accs.append(train_acc)
        val_losses.append(val_loss)
        val_accs.append(val_acc)
        
        # 保存最佳模型
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), 'best_model.pth')
        
        # 打印进度
        if epoch % 10 == 0:
            print(f'Epoch {epoch}/{num_epochs}:')
            print(f'  Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%')
            print(f'  Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%')
            print(f'  LR: {optimizer.param_groups[0]["lr"]:.6f}')
    
    return train_losses, train_accs, val_losses, val_accs

高级训练技巧

1. 梯度累积

python
def train_with_gradient_accumulation(model, dataloader, criterion, optimizer, 
                                   accumulation_steps=4):
    model.train()
    optimizer.zero_grad()
    
    for batch_idx, (data, target) in enumerate(dataloader):
        data, target = data.to(device), target.to(device)
        
        # 前向传播
        output = model(data)
        loss = criterion(output, target)
        
        # 缩放损失
        loss = loss / accumulation_steps
        
        # 反向传播
        loss.backward()
        
        # 每accumulation_steps步更新一次参数
        if (batch_idx + 1) % accumulation_steps == 0:
            optimizer.step()
            optimizer.zero_grad()

2. 混合精度训练

python
from torch.cuda.amp import GradScaler, autocast

def train_with_mixed_precision(model, dataloader, criterion, optimizer):
    scaler = GradScaler()
    model.train()
    
    for data, target in dataloader:
        data, target = data.to(device), target.to(device)
        
        optimizer.zero_grad()
        
        # 使用autocast进行前向传播
        with autocast():
            output = model(data)
            loss = criterion(output, target)
        
        # 缩放损失并反向传播
        scaler.scale(loss).backward()
        
        # 更新参数
        scaler.step(optimizer)
        scaler.update()

3. 早停机制

python
class EarlyStopping:
    def __init__(self, patience=7, min_delta=0, restore_best_weights=True):
        self.patience = patience
        self.min_delta = min_delta
        self.restore_best_weights = restore_best_weights
        self.best_loss = None
        self.counter = 0
        self.best_weights = None
    
    def __call__(self, val_loss, model):
        if self.best_loss is None:
            self.best_loss = val_loss
            self.save_checkpoint(model)
        elif val_loss < self.best_loss - self.min_delta:
            self.best_loss = val_loss
            self.counter = 0
            self.save_checkpoint(model)
        else:
            self.counter += 1
        
        if self.counter >= self.patience:
            if self.restore_best_weights:
                model.load_state_dict(self.best_weights)
            return True
        return False
    
    def save_checkpoint(self, model):
        self.best_weights = model.state_dict().copy()

# 使用早停
early_stopping = EarlyStopping(patience=10)

for epoch in range(num_epochs):
    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
    val_loss, val_acc = validate(model, val_loader, criterion, device)
    
    if early_stopping(val_loss, model):
        print(f"Early stopping at epoch {epoch}")
        break

优化技巧总结

1. 选择合适的优化器

python
# 不同任务的优化器选择建议
def get_optimizer(model, task_type='classification'):
    if task_type == 'classification':
        return optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)
    elif task_type == 'regression':
        return optim.Adam(model.parameters(), lr=0.001)
    elif task_type == 'gan':
        return optim.Adam(model.parameters(), lr=0.0002, betas=(0.5, 0.999))
    else:
        return optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

2. 学习率设置

python
# 学习率查找器
class LRFinder:
    def __init__(self, model, optimizer, criterion, device):
        self.model = model
        self.optimizer = optimizer
        self.criterion = criterion
        self.device = device
    
    def find_lr(self, dataloader, start_lr=1e-7, end_lr=10, num_iter=100):
        lrs = []
        losses = []
        
        lr = start_lr
        self.optimizer.param_groups[0]['lr'] = lr
        
        for i, (data, target) in enumerate(dataloader):
            if i >= num_iter:
                break
            
            data, target = data.to(self.device), target.to(self.device)
            
            self.optimizer.zero_grad()
            output = self.model(data)
            loss = self.criterion(output, target)
            loss.backward()
            self.optimizer.step()
            
            lrs.append(lr)
            losses.append(loss.item())
            
            lr *= (end_lr / start_lr) ** (1 / num_iter)
            self.optimizer.param_groups[0]['lr'] = lr
        
        return lrs, losses

总结

损失函数和优化器是深度学习训练的核心组件:

  1. 损失函数选择:根据任务类型选择合适的损失函数
  2. 优化器选择:了解不同优化器的特点和适用场景
  3. 学习率调度:使用合适的学习率调度策略
  4. 训练技巧:掌握梯度累积、混合精度、早停等技术
  5. 调试优化:学会诊断和解决训练中的问题

掌握这些知识将帮助你训练出更好的深度学习模型!

本站内容仅供学习和研究使用。