TensorFlow 模型构建
模型构建方式概览
TensorFlow提供了多种构建模型的方式,从简单的Sequential API到复杂的自定义模型。选择合适的方式取决于模型的复杂程度和具体需求。
python
import tensorflow as tf
import numpy as np
# 检查TensorFlow版本
print(f"TensorFlow版本: {tf.__version__}")
# 设置随机种子以确保结果可重现
tf.random.set_seed(42)
np.random.seed(42)Sequential API:顺序模型
基本用法
python
# 最简单的模型构建方式
model = tf.keras.Sequential([
tf.keras.layers.Dense(64, activation='relu', input_shape=(784,)),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(32, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])
# 查看模型结构
model.summary()
# 编译模型
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)逐层添加
python
# 创建空的Sequential模型
model = tf.keras.Sequential()
# 逐层添加
model.add(tf.keras.layers.Dense(128, activation='relu', input_shape=(784,)))
model.add(tf.keras.layers.BatchNormalization())
model.add(tf.keras.layers.Dropout(0.3))
model.add(tf.keras.layers.Dense(64, activation='relu'))
model.add(tf.keras.layers.Dropout(0.2))
model.add(tf.keras.layers.Dense(10, activation='softmax'))
print("逐层添加的模型:")
model.summary()实际示例:MNIST分类器
python
# 加载MNIST数据集
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
# 数据预处理
x_train = x_train.reshape(-1, 784).astype('float32') / 255.0
x_test = x_test.reshape(-1, 784).astype('float32') / 255.0
# 构建Sequential模型
mnist_model = tf.keras.Sequential([
tf.keras.layers.Dense(512, activation='relu', input_shape=(784,)),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(256, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(10, activation='softmax')
])
# 编译模型
mnist_model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# 训练模型(少量epoch用于演示)
history = mnist_model.fit(
x_train[:1000], y_train[:1000],
epochs=3,
batch_size=32,
validation_split=0.2,
verbose=1
)
# 评估模型
test_loss, test_acc = mnist_model.evaluate(x_test[:200], y_test[:200], verbose=0)
print(f"测试准确率: {test_acc:.4f}")Functional API:函数式模型
基本概念
python
# 函数式API允许构建更复杂的模型结构
inputs = tf.keras.Input(shape=(784,))
# 构建网络
x = tf.keras.layers.Dense(64, activation='relu')(inputs)
x = tf.keras.layers.Dropout(0.2)(x)
x = tf.keras.layers.Dense(32, activation='relu')(x)
outputs = tf.keras.layers.Dense(10, activation='softmax')(x)
# 创建模型
functional_model = tf.keras.Model(inputs=inputs, outputs=outputs)
functional_model.summary()多输入多输出模型
python
# 多输入模型示例
# 输入1:图像特征
image_input = tf.keras.Input(shape=(64, 64, 3), name='image_input')
x1 = tf.keras.layers.Conv2D(32, 3, activation='relu')(image_input)
x1 = tf.keras.layers.GlobalAveragePooling2D()(x1)
# 输入2:数值特征
numeric_input = tf.keras.Input(shape=(10,), name='numeric_input')
x2 = tf.keras.layers.Dense(32, activation='relu')(numeric_input)
# 合并两个输入
combined = tf.keras.layers.concatenate([x1, x2])
z = tf.keras.layers.Dense(64, activation='relu')(combined)
# 多个输出
output1 = tf.keras.layers.Dense(1, activation='sigmoid', name='binary_output')(z)
output2 = tf.keras.layers.Dense(3, activation='softmax', name='categorical_output')(z)
# 创建多输入多输出模型
multi_model = tf.keras.Model(
inputs=[image_input, numeric_input],
outputs=[output1, output2]
)
multi_model.summary()
# 编译多输出模型
multi_model.compile(
optimizer='adam',
loss={
'binary_output': 'binary_crossentropy',
'categorical_output': 'sparse_categorical_crossentropy'
},
metrics={
'binary_output': ['accuracy'],
'categorical_output': ['accuracy']
}
)残差连接示例
python
def residual_block(x, filters):
"""残差块"""
# 主路径
shortcut = x
# 残差路径
x = tf.keras.layers.Dense(filters, activation='relu')(x)
x = tf.keras.layers.Dense(filters)(x)
# 如果维度不匹配,调整shortcut
if shortcut.shape[-1] != filters:
shortcut = tf.keras.layers.Dense(filters)(shortcut)
# 残差连接
x = tf.keras.layers.Add()([x, shortcut])
x = tf.keras.layers.Activation('relu')(x)
return x
# 构建带残差连接的模型
inputs = tf.keras.Input(shape=(100,))
x = tf.keras.layers.Dense(64, activation='relu')(inputs)
# 添加多个残差块
x = residual_block(x, 64)
x = residual_block(x, 64)
x = residual_block(x, 128)
outputs = tf.keras.layers.Dense(10, activation='softmax')(x)
residual_model = tf.keras.Model(inputs=inputs, outputs=outputs)
residual_model.summary()自定义层
简单自定义层
python
class CustomDense(tf.keras.layers.Layer):
def __init__(self, units, activation=None):
super(CustomDense, self).__init__()
self.units = units
self.activation = tf.keras.activations.get(activation)
def build(self, input_shape):
# 创建权重
self.w = self.add_weight(
shape=(input_shape[-1], self.units),
initializer='random_normal',
trainable=True,
name='weights'
)
self.b = self.add_weight(
shape=(self.units,),
initializer='zeros',
trainable=True,
name='bias'
)
def call(self, inputs):
# 前向传播
output = tf.matmul(inputs, self.w) + self.b
if self.activation is not None:
output = self.activation(output)
return output
def get_config(self):
# 用于序列化
config = super().get_config()
config.update({
'units': self.units,
'activation': tf.keras.activations.serialize(self.activation)
})
return config
# 使用自定义层
custom_model = tf.keras.Sequential([
CustomDense(64, activation='relu', input_shape=(784,)),
CustomDense(32, activation='relu'),
CustomDense(10, activation='softmax')
])
custom_model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
custom_model.summary()复杂自定义层:注意力机制
python
class AttentionLayer(tf.keras.layers.Layer):
def __init__(self, attention_dim):
super(AttentionLayer, self).__init__()
self.attention_dim = attention_dim
def build(self, input_shape):
# 注意力权重
self.W = self.add_weight(
shape=(input_shape[-1], self.attention_dim),
initializer='glorot_uniform',
trainable=True,
name='attention_weights'
)
self.b = self.add_weight(
shape=(self.attention_dim,),
initializer='zeros',
trainable=True,
name='attention_bias'
)
self.u = self.add_weight(
shape=(self.attention_dim,),
initializer='glorot_uniform',
trainable=True,
name='attention_context'
)
def call(self, inputs):
# 计算注意力分数
uit = tf.tanh(tf.tensordot(inputs, self.W, axes=1) + self.b)
ait = tf.tensordot(uit, self.u, axes=1)
# 注意力权重
attention_weights = tf.nn.softmax(ait, axis=1)
attention_weights = tf.expand_dims(attention_weights, -1)
# 加权求和
weighted_input = inputs * attention_weights
output = tf.reduce_sum(weighted_input, axis=1)
return output
def get_config(self):
config = super().get_config()
config.update({'attention_dim': self.attention_dim})
return config
# 使用注意力层的模型
sequence_input = tf.keras.Input(shape=(20, 64)) # 序列长度20,特征维度64
attention_output = AttentionLayer(32)(sequence_input)
dense_output = tf.keras.layers.Dense(10, activation='softmax')(attention_output)
attention_model = tf.keras.Model(inputs=sequence_input, outputs=dense_output)
attention_model.summary()自定义模型类
基本自定义模型
python
class CustomModel(tf.keras.Model):
def __init__(self, num_classes=10):
super(CustomModel, self).__init__()
self.num_classes = num_classes
# 定义层
self.dense1 = tf.keras.layers.Dense(64, activation='relu')
self.dropout1 = tf.keras.layers.Dropout(0.2)
self.dense2 = tf.keras.layers.Dense(32, activation='relu')
self.dropout2 = tf.keras.layers.Dropout(0.2)
self.classifier = tf.keras.layers.Dense(num_classes, activation='softmax')
def call(self, inputs, training=None):
x = self.dense1(inputs)
x = self.dropout1(x, training=training)
x = self.dense2(x)
x = self.dropout2(x, training=training)
return self.classifier(x)
def get_config(self):
return {'num_classes': self.num_classes}
# 创建和使用自定义模型
custom_model = CustomModel(num_classes=10)
# 构建模型(通过调用一次)
sample_input = tf.random.normal([1, 784])
_ = custom_model(sample_input)
custom_model.summary()复杂自定义模型:ResNet块
python
class ResNetBlock(tf.keras.layers.Layer):
def __init__(self, filters, kernel_size=3, stride=1):
super(ResNetBlock, self).__init__()
self.filters = filters
self.kernel_size = kernel_size
self.stride = stride
# 主路径
self.conv1 = tf.keras.layers.Conv2D(
filters, kernel_size, strides=stride, padding='same'
)
self.bn1 = tf.keras.layers.BatchNormalization()
self.conv2 = tf.keras.layers.Conv2D(
filters, kernel_size, padding='same'
)
self.bn2 = tf.keras.layers.BatchNormalization()
# 跳跃连接
self.shortcut_conv = None
if stride != 1:
self.shortcut_conv = tf.keras.layers.Conv2D(
filters, 1, strides=stride, padding='same'
)
self.shortcut_bn = tf.keras.layers.BatchNormalization()
def call(self, inputs, training=None):
# 主路径
x = self.conv1(inputs)
x = self.bn1(x, training=training)
x = tf.nn.relu(x)
x = self.conv2(x)
x = self.bn2(x, training=training)
# 跳跃连接
shortcut = inputs
if self.shortcut_conv is not None:
shortcut = self.shortcut_conv(inputs)
shortcut = self.shortcut_bn(shortcut, training=training)
# 残差连接
x = tf.keras.layers.Add()([x, shortcut])
x = tf.nn.relu(x)
return x
class MiniResNet(tf.keras.Model):
def __init__(self, num_classes=10):
super(MiniResNet, self).__init__()
self.num_classes = num_classes
# 初始卷积
self.initial_conv = tf.keras.layers.Conv2D(32, 7, strides=2, padding='same')
self.initial_bn = tf.keras.layers.BatchNormalization()
self.initial_pool = tf.keras.layers.MaxPooling2D(3, strides=2, padding='same')
# ResNet块
self.block1 = ResNetBlock(32)
self.block2 = ResNetBlock(64, stride=2)
self.block3 = ResNetBlock(128, stride=2)
# 分类头
self.global_pool = tf.keras.layers.GlobalAveragePooling2D()
self.classifier = tf.keras.layers.Dense(num_classes, activation='softmax')
def call(self, inputs, training=None):
# 初始处理
x = self.initial_conv(inputs)
x = self.initial_bn(x, training=training)
x = tf.nn.relu(x)
x = self.initial_pool(x)
# ResNet块
x = self.block1(x, training=training)
x = self.block2(x, training=training)
x = self.block3(x, training=training)
# 分类
x = self.global_pool(x)
return self.classifier(x)
# 创建MiniResNet模型
resnet_model = MiniResNet(num_classes=10)
# 构建模型
sample_image = tf.random.normal([1, 224, 224, 3])
_ = resnet_model(sample_image)
resnet_model.summary()模型子类化高级技巧
动态模型结构
python
class DynamicModel(tf.keras.Model):
def __init__(self, layer_sizes, num_classes=10):
super(DynamicModel, self).__init__()
self.layer_sizes = layer_sizes
self.num_classes = num_classes
# 动态创建层
self.hidden_layers = []
for i, size in enumerate(layer_sizes):
self.hidden_layers.append(
tf.keras.layers.Dense(size, activation='relu', name=f'hidden_{i}')
)
self.hidden_layers.append(
tf.keras.layers.Dropout(0.2, name=f'dropout_{i}')
)
self.output_layer = tf.keras.layers.Dense(num_classes, activation='softmax')
def call(self, inputs, training=None):
x = inputs
for layer in self.hidden_layers:
x = layer(x, training=training)
return self.output_layer(x)
def get_config(self):
return {
'layer_sizes': self.layer_sizes,
'num_classes': self.num_classes
}
# 创建动态模型
dynamic_model = DynamicModel([128, 64, 32], num_classes=10)
# 测试模型
test_input = tf.random.normal([10, 784])
output = dynamic_model(test_input)
print(f"动态模型输出形状: {output.shape}")条件执行模型
python
class ConditionalModel(tf.keras.Model):
def __init__(self, num_classes=10):
super(ConditionalModel, self).__init__()
self.num_classes = num_classes
# 不同的处理分支
self.branch_a = tf.keras.Sequential([
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.3)
])
self.branch_b = tf.keras.Sequential([
tf.keras.layers.Dense(64, activation='tanh'),
tf.keras.layers.Dropout(0.2)
])
self.classifier = tf.keras.layers.Dense(num_classes, activation='softmax')
def call(self, inputs, training=None, use_branch_a=True):
if use_branch_a:
x = self.branch_a(inputs, training=training)
else:
x = self.branch_b(inputs, training=training)
return self.classifier(x)
# 使用条件模型
conditional_model = ConditionalModel()
test_input = tf.random.normal([5, 100])
# 使用不同分支
output_a = conditional_model(test_input, use_branch_a=True)
output_b = conditional_model(test_input, use_branch_a=False)
print(f"分支A输出: {output_a.shape}")
print(f"分支B输出: {output_b.shape}")模型组合与集成
模型堆叠
python
# 创建多个基础模型
model1 = tf.keras.Sequential([
tf.keras.layers.Dense(64, activation='relu', input_shape=(784,)),
tf.keras.layers.Dense(32, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])
model2 = tf.keras.Sequential([
tf.keras.layers.Dense(128, activation='tanh', input_shape=(784,)),
tf.keras.layers.Dense(64, activation='tanh'),
tf.keras.layers.Dense(10, activation='softmax')
])
# 集成模型
class EnsembleModel(tf.keras.Model):
def __init__(self, models):
super(EnsembleModel, self).__init__()
self.models = models
def call(self, inputs):
# 获取所有模型的预测
predictions = [model(inputs) for model in self.models]
# 平均集成
ensemble_pred = tf.reduce_mean(tf.stack(predictions), axis=0)
return ensemble_pred
# 创建集成模型
ensemble = EnsembleModel([model1, model2])
# 测试集成模型
test_input = tf.random.normal([5, 784])
ensemble_output = ensemble(test_input)
print(f"集成模型输出: {ensemble_output.shape}")加权集成
python
class WeightedEnsemble(tf.keras.Model):
def __init__(self, models, weights=None):
super(WeightedEnsemble, self).__init__()
self.models = models
if weights is None:
weights = [1.0 / len(models)] * len(models)
# 创建可训练的权重
self.ensemble_weights = tf.Variable(
weights, trainable=True, name='ensemble_weights'
)
def call(self, inputs):
# 获取所有模型的预测
predictions = tf.stack([model(inputs) for model in self.models])
# 应用softmax确保权重和为1
normalized_weights = tf.nn.softmax(self.ensemble_weights)
# 加权平均
weighted_pred = tf.reduce_sum(
predictions * tf.reshape(normalized_weights, [-1, 1, 1]),
axis=0
)
return weighted_pred
# 创建加权集成
weighted_ensemble = WeightedEnsemble([model1, model2], weights=[0.6, 0.4])
weighted_output = weighted_ensemble(test_input)
print(f"加权集成输出: {weighted_output.shape}")模型调试与可视化
模型结构可视化
python
# 创建一个复杂模型用于可视化
def create_complex_model():
inputs = tf.keras.Input(shape=(784,), name='input')
# 第一个分支
branch1 = tf.keras.layers.Dense(128, activation='relu', name='branch1_dense1')(inputs)
branch1 = tf.keras.layers.Dropout(0.2, name='branch1_dropout')(branch1)
branch1 = tf.keras.layers.Dense(64, activation='relu', name='branch1_dense2')(branch1)
# 第二个分支
branch2 = tf.keras.layers.Dense(64, activation='tanh', name='branch2_dense1')(inputs)
branch2 = tf.keras.layers.Dropout(0.3, name='branch2_dropout')(branch2)
# 合并分支
merged = tf.keras.layers.concatenate([branch1, branch2], name='merge')
# 输出层
outputs = tf.keras.layers.Dense(10, activation='softmax', name='output')(merged)
return tf.keras.Model(inputs=inputs, outputs=outputs, name='complex_model')
complex_model = create_complex_model()
# 可视化模型结构
tf.keras.utils.plot_model(
complex_model,
to_file='model_structure.png',
show_shapes=True,
show_layer_names=True,
rankdir='TB'
)
print("模型结构图已保存为 model_structure.png")
# 详细的模型信息
complex_model.summary()
# 获取层信息
print("\n层详细信息:")
for i, layer in enumerate(complex_model.layers):
print(f"层 {i}: {layer.name} ({layer.__class__.__name__})")
if hasattr(layer, 'units'):
print(f" 单元数: {layer.units}")
if hasattr(layer, 'activation'):
print(f" 激活函数: {layer.activation.__name__}")中间层输出检查
python
# 创建中间层输出模型
def create_intermediate_model(base_model, layer_names):
"""创建输出中间层结果的模型"""
outputs = [base_model.get_layer(name).output for name in layer_names]
return tf.keras.Model(inputs=base_model.input, outputs=outputs)
# 检查中间层输出
layer_names = ['branch1_dense1', 'branch2_dense1', 'merge']
intermediate_model = create_intermediate_model(complex_model, layer_names)
# 获取中间层输出
test_input = tf.random.normal([1, 784])
intermediate_outputs = intermediate_model(test_input)
print("中间层输出:")
for name, output in zip(layer_names, intermediate_outputs):
print(f"{name}: {output.shape}")模型保存与加载
完整模型保存
python
# 训练一个简单模型
simple_model = tf.keras.Sequential([
tf.keras.layers.Dense(64, activation='relu', input_shape=(784,)),
tf.keras.layers.Dense(10, activation='softmax')
])
simple_model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# 创建一些假数据进行训练
fake_x = tf.random.normal([100, 784])
fake_y = tf.random.uniform([100], maxval=10, dtype=tf.int32)
simple_model.fit(fake_x, fake_y, epochs=1, verbose=0)
# 保存完整模型
simple_model.save('complete_model.h5')
print("完整模型已保存")
# 加载完整模型
loaded_model = tf.keras.models.load_model('complete_model.h5')
print("模型加载成功")
# 验证加载的模型
test_pred = loaded_model.predict(fake_x[:5], verbose=0)
print(f"预测结果形状: {test_pred.shape}")仅保存权重
python
# 保存模型权重
simple_model.save_weights('model_weights.h5')
print("权重已保存")
# 创建相同结构的新模型
new_model = tf.keras.Sequential([
tf.keras.layers.Dense(64, activation='relu', input_shape=(784,)),
tf.keras.layers.Dense(10, activation='softmax')
])
# 加载权重
new_model.load_weights('model_weights.h5')
print("权重加载成功")
# 验证权重是否相同
original_pred = simple_model.predict(fake_x[:1], verbose=0)
new_pred = new_model.predict(fake_x[:1], verbose=0)
print(f"预测结果是否相同: {np.allclose(original_pred, new_pred)}")SavedModel格式
python
# 保存为SavedModel格式(推荐用于生产)
tf.saved_model.save(simple_model, 'saved_model_dir')
print("SavedModel格式已保存")
# 加载SavedModel
loaded_saved_model = tf.saved_model.load('saved_model_dir')
# 使用加载的模型进行推理
inference_func = loaded_saved_model.signatures['serving_default']
test_input_dict = {'dense_input': tf.constant(fake_x[:1])}
saved_model_pred = inference_func(**test_input_dict)
print(f"SavedModel预测结果: {list(saved_model_pred.values())[0].shape}")最佳实践
1. 模型设计原则
python
# 好的实践:模块化设计
class ModelBlock(tf.keras.layers.Layer):
def __init__(self, units, dropout_rate=0.2):
super(ModelBlock, self).__init__()
self.dense = tf.keras.layers.Dense(units, activation='relu')
self.dropout = tf.keras.layers.Dropout(dropout_rate)
self.batch_norm = tf.keras.layers.BatchNormalization()
def call(self, inputs, training=None):
x = self.dense(inputs)
x = self.batch_norm(x, training=training)
x = self.dropout(x, training=training)
return x
# 使用模块化块构建模型
modular_model = tf.keras.Sequential([
ModelBlock(128),
ModelBlock(64),
ModelBlock(32),
tf.keras.layers.Dense(10, activation='softmax')
])2. 性能优化
python
# 使用mixed precision训练
policy = tf.keras.mixed_precision.Policy('mixed_float16')
tf.keras.mixed_precision.set_global_policy(policy)
# 优化的模型结构
@tf.function
def optimized_model_call(model, inputs):
return model(inputs, training=False)
# 批量归一化的正确使用
class OptimizedModel(tf.keras.Model):
def __init__(self):
super(OptimizedModel, self).__init__()
self.conv1 = tf.keras.layers.Conv2D(32, 3, padding='same')
self.bn1 = tf.keras.layers.BatchNormalization()
self.conv2 = tf.keras.layers.Conv2D(64, 3, padding='same')
self.bn2 = tf.keras.layers.BatchNormalization()
self.global_pool = tf.keras.layers.GlobalAveragePooling2D()
self.classifier = tf.keras.layers.Dense(10)
@tf.function
def call(self, inputs, training=None):
x = self.conv1(inputs)
x = self.bn1(x, training=training)
x = tf.nn.relu(x)
x = self.conv2(x)
x = self.bn2(x, training=training)
x = tf.nn.relu(x)
x = self.global_pool(x)
return self.classifier(x)3. 调试技巧
python
# 添加调试信息的模型
class DebuggableModel(tf.keras.Model):
def __init__(self):
super(DebuggableModel, self).__init__()
self.dense1 = tf.keras.layers.Dense(64, activation='relu')
self.dense2 = tf.keras.layers.Dense(32, activation='relu')
self.output_layer = tf.keras.layers.Dense(10)
def call(self, inputs, training=None):
# 添加形状检查
tf.debugging.assert_rank(inputs, 2, "输入必须是2维张量")
x = self.dense1(inputs)
tf.debugging.assert_all_finite(x, "dense1输出包含无效值")
x = self.dense2(x)
tf.debugging.assert_all_finite(x, "dense2输出包含无效值")
outputs = self.output_layer(x)
# 在调试模式下打印信息
if training:
tf.print("训练模式 - 输出范围:", tf.reduce_min(outputs), tf.reduce_max(outputs))
return outputs
# 梯度检查
def check_gradients(model, inputs, targets):
with tf.GradientTape() as tape:
predictions = model(inputs, training=True)
loss = tf.keras.losses.sparse_categorical_crossentropy(targets, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
# 检查梯度
for i, grad in enumerate(gradients):
if grad is not None:
grad_norm = tf.norm(grad)
tf.print(f"层 {i} 梯度范数: {grad_norm}")
if grad_norm > 10.0:
tf.print(f"警告: 层 {i} 梯度过大!")总结
TensorFlow提供了多种灵活的模型构建方式:
- Sequential API:适合简单的线性模型
- Functional API:适合复杂的网络结构,支持多输入输出
- 自定义层:实现特殊的计算逻辑
- 模型子类化:最大的灵活性,适合研究和复杂应用
- 模型组合:集成多个模型提升性能
选择合适的构建方式,遵循最佳实践,可以构建出高效、可维护的深度学习模型!