Skip to content

TensorFlow Keras 高级API

Keras简介

Keras是TensorFlow的高级API,提供了简洁、直观的接口来构建和训练深度学习模型。从TensorFlow 2.0开始,Keras已经完全集成到TensorFlow中,成为构建神经网络的主要方式。

python
import tensorflow as tf
from tensorflow import keras
import numpy as np

print(f"TensorFlow版本: {tf.__version__}")
print(f"Keras版本: {keras.__version__}")

模型构建方式

1. Sequential模型

最简单的模型构建方式,适用于层的线性堆叠。

python
# 方式1:构造函数中定义
model = keras.Sequential([
    keras.layers.Dense(64, activation='relu', input_shape=(784,)),
    keras.layers.Dropout(0.2),
    keras.layers.Dense(32, activation='relu'),
    keras.layers.Dense(10, activation='softmax')
])

print(model.summary())

# 方式2:逐层添加
model = keras.Sequential()
model.add(keras.layers.Dense(64, activation='relu', input_shape=(784,)))
model.add(keras.layers.Dropout(0.2))
model.add(keras.layers.Dense(32, activation='relu'))
model.add(keras.layers.Dense(10, activation='softmax'))

# 方式3:使用名称
model = keras.Sequential([
    keras.layers.Dense(64, activation='relu', input_shape=(784,), name='hidden1'),
    keras.layers.Dropout(0.2, name='dropout1'),
    keras.layers.Dense(32, activation='relu', name='hidden2'),
    keras.layers.Dense(10, activation='softmax', name='output')
], name='mnist_model')

print(f"模型名称: {model.name}")

2. Functional API

更灵活的模型构建方式,支持复杂的网络结构。

python
# 定义输入
inputs = keras.Input(shape=(784,), name='input_layer')

# 构建网络
x = keras.layers.Dense(64, activation='relu', name='hidden1')(inputs)
x = keras.layers.Dropout(0.2, name='dropout1')(x)
x = keras.layers.Dense(32, activation='relu', name='hidden2')(x)
outputs = keras.layers.Dense(10, activation='softmax', name='output')(x)

# 创建模型
model = keras.Model(inputs=inputs, outputs=outputs, name='functional_model')

print(model.summary())

# 多输入多输出示例
input1 = keras.Input(shape=(64,), name='input1')
input2 = keras.Input(shape=(32,), name='input2')

# 处理第一个输入
x1 = keras.layers.Dense(32, activation='relu')(input1)
x1 = keras.layers.Dropout(0.2)(x1)

# 处理第二个输入
x2 = keras.layers.Dense(16, activation='relu')(input2)

# 合并两个分支
merged = keras.layers.concatenate([x1, x2])
output1 = keras.layers.Dense(10, activation='softmax', name='classification')(merged)
output2 = keras.layers.Dense(1, activation='sigmoid', name='regression')(merged)

# 创建多输出模型
multi_model = keras.Model(
    inputs=[input1, input2],
    outputs=[output1, output2],
    name='multi_io_model'
)

print(multi_model.summary())

3. 子类化模型

最灵活的方式,通过继承keras.Model类来定义模型。

python
class CustomModel(keras.Model):
    def __init__(self, num_classes=10):
        super(CustomModel, self).__init__(name='custom_model')
        self.num_classes = num_classes
        
        # 定义层
        self.dense1 = keras.layers.Dense(64, activation='relu')
        self.dropout1 = keras.layers.Dropout(0.2)
        self.dense2 = keras.layers.Dense(32, activation='relu')
        self.classifier = keras.layers.Dense(num_classes, activation='softmax')
    
    def call(self, inputs, training=None):
        x = self.dense1(inputs)
        x = self.dropout1(x, training=training)
        x = self.dense2(x)
        return self.classifier(x)
    
    def get_config(self):
        config = super(CustomModel, self).get_config()
        config.update({'num_classes': self.num_classes})
        return config

# 创建自定义模型
custom_model = CustomModel(num_classes=10)

# 需要先调用模型来构建
dummy_input = tf.random.normal([1, 784])
_ = custom_model(dummy_input)

print(custom_model.summary())

常用层详解

1. 核心层

python
# Dense层(全连接层)
dense = keras.layers.Dense(
    units=64,                    # 神经元数量
    activation='relu',           # 激活函数
    use_bias=True,              # 是否使用偏置
    kernel_initializer='glorot_uniform',  # 权重初始化
    bias_initializer='zeros',    # 偏置初始化
    kernel_regularizer=keras.regularizers.l2(0.01),  # 权重正则化
    name='dense_layer'
)

# Dropout层
dropout = keras.layers.Dropout(
    rate=0.2,                   # 丢弃率
    noise_shape=None,           # 噪声形状
    seed=None                   # 随机种子
)

# Activation层
activation = keras.layers.Activation('relu')
# 或者使用特定激活函数
relu = keras.layers.ReLU()
leaky_relu = keras.layers.LeakyReLU(alpha=0.1)

2. 卷积层

python
# 2D卷积层
conv2d = keras.layers.Conv2D(
    filters=32,                 # 卷积核数量
    kernel_size=(3, 3),         # 卷积核大小
    strides=(1, 1),            # 步长
    padding='valid',            # 填充方式:'valid' 或 'same'
    activation='relu',          # 激活函数
    use_bias=True,             # 是否使用偏置
    kernel_initializer='glorot_uniform'
)

# 1D卷积层(用于序列数据)
conv1d = keras.layers.Conv1D(
    filters=64,
    kernel_size=3,
    activation='relu'
)

# 转置卷积层(反卷积)
conv2d_transpose = keras.layers.Conv2DTranspose(
    filters=32,
    kernel_size=(3, 3),
    strides=(2, 2),
    padding='same'
)

# 深度可分离卷积
separable_conv = keras.layers.SeparableConv2D(
    filters=32,
    kernel_size=(3, 3),
    activation='relu'
)

3. 池化层

python
# 最大池化
max_pool = keras.layers.MaxPooling2D(
    pool_size=(2, 2),          # 池化窗口大小
    strides=None,              # 步长(默认等于pool_size)
    padding='valid'            # 填充方式
)

# 平均池化
avg_pool = keras.layers.AveragePooling2D(
    pool_size=(2, 2),
    strides=None,
    padding='valid'
)

# 全局池化
global_max_pool = keras.layers.GlobalMaxPooling2D()
global_avg_pool = keras.layers.GlobalAveragePooling2D()

# 1D池化
max_pool_1d = keras.layers.MaxPooling1D(pool_size=2)

4. 循环层

python
# LSTM层
lstm = keras.layers.LSTM(
    units=128,                  # 隐藏单元数
    activation='tanh',          # 激活函数
    recurrent_activation='sigmoid',  # 循环激活函数
    use_bias=True,
    return_sequences=False,     # 是否返回完整序列
    return_state=False,         # 是否返回最终状态
    dropout=0.0,               # 输入dropout
    recurrent_dropout=0.0      # 循环dropout
)

# GRU层
gru = keras.layers.GRU(
    units=128,
    activation='tanh',
    return_sequences=True
)

# 简单RNN层
simple_rnn = keras.layers.SimpleRNN(
    units=64,
    activation='tanh'
)

# 双向RNN
bidirectional_lstm = keras.layers.Bidirectional(
    keras.layers.LSTM(64, return_sequences=True)
)

5. 正则化层

python
# 批量归一化
batch_norm = keras.layers.BatchNormalization(
    axis=-1,                   # 归一化的轴
    momentum=0.99,             # 移动平均的动量
    epsilon=0.001,             # 数值稳定性参数
    center=True,               # 是否使用beta参数
    scale=True                 # 是否使用gamma参数
)

# 层归一化
layer_norm = keras.layers.LayerNormalization(
    axis=-1,
    epsilon=0.001
)

# Dropout
dropout = keras.layers.Dropout(0.2)

# 空间Dropout(用于卷积层)
spatial_dropout = keras.layers.SpatialDropout2D(0.2)

6. 其他常用层

python
# Flatten层(展平)
flatten = keras.layers.Flatten()

# Reshape层
reshape = keras.layers.Reshape((28, 28, 1))

# Permute层(维度重排)
permute = keras.layers.Permute((2, 1))

# RepeatVector层
repeat = keras.layers.RepeatVector(3)

# Lambda层(自定义操作)
lambda_layer = keras.layers.Lambda(lambda x: tf.square(x))

# 嵌入层
embedding = keras.layers.Embedding(
    input_dim=10000,           # 词汇表大小
    output_dim=128,            # 嵌入维度
    input_length=100           # 输入序列长度
)

模型编译

python
# 基本编译
model.compile(
    optimizer='adam',           # 优化器
    loss='sparse_categorical_crossentropy',  # 损失函数
    metrics=['accuracy']        # 评估指标
)

# 详细配置优化器
model.compile(
    optimizer=keras.optimizers.Adam(
        learning_rate=0.001,
        beta_1=0.9,
        beta_2=0.999,
        epsilon=1e-7
    ),
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=False),
    metrics=[
        keras.metrics.SparseCategoricalAccuracy(name='accuracy'),
        keras.metrics.TopKCategoricalAccuracy(k=5, name='top5_accuracy')
    ]
)

# 多输出模型编译
multi_model.compile(
    optimizer='adam',
    loss={
        'classification': 'sparse_categorical_crossentropy',
        'regression': 'mse'
    },
    loss_weights={
        'classification': 1.0,
        'regression': 0.5
    },
    metrics={
        'classification': ['accuracy'],
        'regression': ['mae']
    }
)

模型训练

1. 基本训练

python
# 准备数据
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = x_train.reshape(-1, 784).astype('float32') / 255.0
x_test = x_test.reshape(-1, 784).astype('float32') / 255.0

# 训练模型
history = model.fit(
    x_train, y_train,
    batch_size=32,             # 批量大小
    epochs=10,                 # 训练轮数
    validation_data=(x_test, y_test),  # 验证数据
    verbose=1,                 # 详细程度:0=静默,1=进度条,2=每轮一行
    shuffle=True               # 是否打乱数据
)

# 查看训练历史
print("训练历史键:", history.history.keys())
print("最终训练准确率:", history.history['accuracy'][-1])
print("最终验证准确率:", history.history['val_accuracy'][-1])

2. 使用回调函数

python
# 定义回调函数
callbacks = [
    # 早停
    keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=5,
        restore_best_weights=True
    ),
    
    # 学习率调度
    keras.callbacks.ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.5,
        patience=3,
        min_lr=1e-7
    ),
    
    # 模型检查点
    keras.callbacks.ModelCheckpoint(
        filepath='best_model.h5',
        monitor='val_accuracy',
        save_best_only=True,
        save_weights_only=False
    ),
    
    # TensorBoard日志
    keras.callbacks.TensorBoard(
        log_dir='./logs',
        histogram_freq=1,
        write_graph=True
    ),
    
    # 自定义回调
    keras.callbacks.LambdaCallback(
        on_epoch_end=lambda epoch, logs: print(f"Epoch {epoch}: loss={logs['loss']:.4f}")
    )
]

# 使用回调函数训练
history = model.fit(
    x_train, y_train,
    batch_size=32,
    epochs=100,
    validation_data=(x_test, y_test),
    callbacks=callbacks
)

3. 自定义训练循环

python
# 自定义训练步骤
@tf.function
def train_step(x, y, model, optimizer, loss_fn, train_accuracy):
    with tf.GradientTape() as tape:
        predictions = model(x, training=True)
        loss = loss_fn(y, predictions)
    
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    
    train_accuracy.update_state(y, predictions)
    return loss

# 自定义验证步骤
@tf.function
def val_step(x, y, model, loss_fn, val_accuracy):
    predictions = model(x, training=False)
    loss = loss_fn(y, predictions)
    val_accuracy.update_state(y, predictions)
    return loss

# 训练循环
def custom_training_loop(model, train_dataset, val_dataset, epochs):
    optimizer = keras.optimizers.Adam()
    loss_fn = keras.losses.SparseCategoricalCrossentropy()
    
    train_accuracy = keras.metrics.SparseCategoricalAccuracy()
    val_accuracy = keras.metrics.SparseCategoricalAccuracy()
    
    for epoch in range(epochs):
        print(f"Epoch {epoch + 1}/{epochs}")
        
        # 训练
        train_loss = 0
        train_accuracy.reset_states()
        for x_batch, y_batch in train_dataset:
            loss = train_step(x_batch, y_batch, model, optimizer, loss_fn, train_accuracy)
            train_loss += loss
        
        # 验证
        val_loss = 0
        val_accuracy.reset_states()
        for x_batch, y_batch in val_dataset:
            loss = val_step(x_batch, y_batch, model, loss_fn, val_accuracy)
            val_loss += loss
        
        print(f"Loss: {train_loss:.4f}, Accuracy: {train_accuracy.result():.4f}, "
              f"Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy.result():.4f}")

# 准备数据集
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).batch(32)
val_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(32)

# 运行自定义训练
# custom_training_loop(model, train_dataset, val_dataset, epochs=5)

模型评估和预测

python
# 模型评估
test_loss, test_accuracy = model.evaluate(x_test, y_test, verbose=0)
print(f"测试损失: {test_loss:.4f}")
print(f"测试准确率: {test_accuracy:.4f}")

# 详细评估
evaluation = model.evaluate(
    x_test, y_test,
    batch_size=32,
    verbose=1,
    return_dict=True
)
print("详细评估结果:", evaluation)

# 预测
predictions = model.predict(x_test[:10])
print(f"预测形状: {predictions.shape}")
print(f"前5个预测: {np.argmax(predictions[:5], axis=1)}")
print(f"真实标签: {y_test[:5]}")

# 批量预测
batch_predictions = model.predict(
    x_test,
    batch_size=32,
    verbose=1
)

# 单样本预测
single_prediction = model.predict(x_test[0:1])
predicted_class = np.argmax(single_prediction, axis=1)[0]
print(f"单样本预测类别: {predicted_class}")

模型保存和加载

python
# 保存整个模型
model.save('my_model.h5')  # HDF5格式
model.save('my_model')     # SavedModel格式(推荐)

# 只保存权重
model.save_weights('model_weights.h5')

# 保存模型架构
model_json = model.to_json()
with open('model_architecture.json', 'w') as f:
    f.write(model_json)

# 加载模型
loaded_model = keras.models.load_model('my_model.h5')

# 加载权重
model.load_weights('model_weights.h5')

# 从架构重建模型
with open('model_architecture.json', 'r') as f:
    model_json = f.read()
model_from_json = keras.models.model_from_json(model_json)
model_from_json.load_weights('model_weights.h5')

# 验证加载的模型
loaded_predictions = loaded_model.predict(x_test[:5])
original_predictions = model.predict(x_test[:5])
print("模型加载验证:", np.allclose(loaded_predictions, original_predictions))

模型可视化

python
# 绘制模型结构
keras.utils.plot_model(
    model,
    to_file='model.png',
    show_shapes=True,
    show_layer_names=True,
    rankdir='TB',              # 'TB'=从上到下,'LR'=从左到右
    expand_nested=False,
    dpi=96
)

# 查看模型摘要
print(model.summary())

# 获取层信息
for i, layer in enumerate(model.layers):
    print(f"层 {i}: {layer.name} - {layer.__class__.__name__}")
    if hasattr(layer, 'units'):
        print(f"  单元数: {layer.units}")
    if hasattr(layer, 'activation'):
        print(f"  激活函数: {layer.activation}")
    print(f"  输出形状: {layer.output_shape}")
    print(f"  参数数量: {layer.count_params()}")
    print()

# 可视化训练历史
import matplotlib.pyplot as plt

def plot_training_history(history):
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
    
    # 损失曲线
    ax1.plot(history.history['loss'], label='训练损失')
    ax1.plot(history.history['val_loss'], label='验证损失')
    ax1.set_title('模型损失')
    ax1.set_xlabel('轮次')
    ax1.set_ylabel('损失')
    ax1.legend()
    
    # 准确率曲线
    ax2.plot(history.history['accuracy'], label='训练准确率')
    ax2.plot(history.history['val_accuracy'], label='验证准确率')
    ax2.set_title('模型准确率')
    ax2.set_xlabel('轮次')
    ax2.set_ylabel('准确率')
    ax2.legend()
    
    plt.tight_layout()
    plt.show()

# plot_training_history(history)

高级功能

1. 自定义层

python
class CustomDense(keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        super(CustomDense, self).__init__(**kwargs)
        self.units = units
        self.activation = keras.activations.get(activation)
    
    def build(self, input_shape):
        # 创建权重
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer='random_normal',
            trainable=True,
            name='kernel'
        )
        self.b = self.add_weight(
            shape=(self.units,),
            initializer='zeros',
            trainable=True,
            name='bias'
        )
        super(CustomDense, self).build(input_shape)
    
    def call(self, inputs):
        output = tf.matmul(inputs, self.w) + self.b
        if self.activation is not None:
            output = self.activation(output)
        return output
    
    def get_config(self):
        config = super(CustomDense, self).get_config()
        config.update({
            'units': self.units,
            'activation': keras.activations.serialize(self.activation)
        })
        return config

# 使用自定义层
model_with_custom = keras.Sequential([
    CustomDense(64, activation='relu', input_shape=(784,)),
    CustomDense(10, activation='softmax')
])

2. 自定义损失函数

python
def custom_loss(y_true, y_pred):
    # 自定义损失函数
    return tf.reduce_mean(tf.square(y_true - y_pred))

# 使用自定义损失
model.compile(
    optimizer='adam',
    loss=custom_loss,
    metrics=['accuracy']
)

3. 自定义指标

python
class CustomAccuracy(keras.metrics.Metric):
    def __init__(self, name='custom_accuracy', **kwargs):
        super(CustomAccuracy, self).__init__(name=name, **kwargs)
        self.total = self.add_weight(name='total', initializer='zeros')
        self.count = self.add_weight(name='count', initializer='zeros')
    
    def update_state(self, y_true, y_pred, sample_weight=None):
        y_pred = tf.argmax(y_pred, axis=1)
        y_true = tf.cast(y_true, y_pred.dtype)
        
        matches = tf.cast(tf.equal(y_true, y_pred), tf.float32)
        self.total.assign_add(tf.reduce_sum(matches))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))
    
    def result(self):
        return self.total / self.count
    
    def reset_states(self):
        self.total.assign(0)
        self.count.assign(0)

# 使用自定义指标
model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=[CustomAccuracy()]
)

总结

Keras提供了构建深度学习模型的高级接口,主要特点包括:

  1. 多种构建方式:Sequential、Functional API、子类化模型
  2. 丰富的层类型:Dense、Conv2D、LSTM等各种预定义层
  3. 灵活的训练:fit方法、自定义训练循环、回调函数
  4. 完整的工具链:模型保存加载、可视化、评估预测
  5. 高度可扩展:支持自定义层、损失函数、指标

掌握Keras API是使用TensorFlow进行深度学习的关键技能!

本站内容仅供学习和研究使用。