文本分类项目
本章将通过一个完整的文本分类项目,展示如何使用TensorFlow处理自然语言处理任务。我们将构建一个情感分析系统,能够判断文本的情感倾向。
项目概述
我们将构建一个多类别文本分类器,用于分析电影评论的情感(正面、负面、中性),并扩展到其他文本分类任务。
项目目标
- 掌握文本预处理技术
- 学习词嵌入和序列建模
- 构建多种文本分类模型
- 实现模型解释和可视化
- 部署文本分类服务
python
import tensorflow as tf
from tensorflow import keras
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import re
import string
from collections import Counter
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer, WordNetLemmatizer
import warnings
warnings.filterwarnings('ignore')
# 设置随机种子
tf.random.set_seed(42)
np.random.seed(42)
print(f"TensorFlow版本: {tf.__version__}")
# 下载NLTK数据(首次运行时需要)
# nltk.download('punkt')
# nltk.download('stopwords')
# nltk.download('wordnet')数据准备
加载IMDB数据集
python
def load_imdb_data(num_words=10000, maxlen=500):
"""
加载IMDB电影评论数据集
"""
# 加载数据
(x_train, y_train), (x_test, y_test) = keras.datasets.imdb.load_data(
num_words=num_words
)
# 获取词汇表
word_index = keras.datasets.imdb.get_word_index()
# 反向词汇表(索引到单词)
reverse_word_index = {value: key for key, value in word_index.items()}
print(f"训练样本数: {len(x_train)}")
print(f"测试样本数: {len(x_test)}")
print(f"词汇表大小: {len(word_index)}")
return (x_train, y_train), (x_test, y_test), word_index, reverse_word_index
def decode_review(encoded_review, reverse_word_index):
"""
将编码的评论转换回文本
"""
# 注意:索引偏移了3,因为0、1、2是保留的特殊标记
return ' '.join([reverse_word_index.get(i - 3, '?') for i in encoded_review])
# 加载数据
(x_train, y_train), (x_test, y_test), word_index, reverse_word_index = load_imdb_data()
# 查看样本
print("原始评论示例:")
print(decode_review(x_train[0], reverse_word_index))
print(f"标签: {y_train[0]} ({'正面' if y_train[0] == 1 else '负面'})")自定义数据集处理
python
def load_custom_text_data(file_path):
"""
加载自定义文本数据
"""
# 假设CSV格式:text, label
df = pd.read_csv(file_path)
print(f"数据集大小: {len(df)}")
print(f"类别分布:\n{df['label'].value_counts()}")
return df
def preprocess_text(text):
"""
文本预处理函数
"""
# 转换为小写
text = text.lower()
# 移除HTML标签
text = re.sub(r'<[^>]+>', '', text)
# 移除URL
text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)
# 移除用户名和邮箱
text = re.sub(r'@\w+|\b\w+@\w+\.\w+', '', text)
# 移除标点符号(保留一些有意义的)
text = re.sub(r'[^\w\s!?.]', '', text)
# 移除多余的空格
text = re.sub(r'\s+', ' ', text).strip()
return text
def advanced_text_preprocessing(texts, remove_stopwords=True,
use_stemming=False, use_lemmatization=True):
"""
高级文本预处理
"""
# 初始化工具
stop_words = set(stopwords.words('english')) if remove_stopwords else set()
stemmer = PorterStemmer() if use_stemming else None
lemmatizer = WordNetLemmatizer() if use_lemmatization else None
processed_texts = []
for text in texts:
# 基础预处理
text = preprocess_text(text)
# 分词
tokens = word_tokenize(text)
# 移除停用词
if remove_stopwords:
tokens = [token for token in tokens if token not in stop_words]
# 词干提取
if use_stemming and stemmer:
tokens = [stemmer.stem(token) for token in tokens]
# 词形还原
if use_lemmatization and lemmatizer:
tokens = [lemmatizer.lemmatize(token) for token in tokens]
# 重新组合
processed_text = ' '.join(tokens)
processed_texts.append(processed_text)
return processed_texts
# 示例:处理自定义数据
def create_sample_dataset():
"""
创建示例数据集
"""
sample_data = {
'text': [
"This movie is absolutely fantastic! I loved every minute of it.",
"Terrible film, waste of time and money. Very disappointing.",
"The movie was okay, nothing special but not bad either.",
"Amazing cinematography and great acting. Highly recommended!",
"Boring and predictable plot. I fell asleep halfway through.",
"Decent movie with some good moments. Worth watching once."
],
'label': [1, 0, 2, 1, 0, 2] # 0: 负面, 1: 正面, 2: 中性
}
df = pd.DataFrame(sample_data)
return df
# 创建示例数据
sample_df = create_sample_dataset()
print("示例数据集:")
print(sample_df)数据可视化
python
def visualize_text_data(texts, labels, label_names=None):
"""
可视化文本数据
"""
if label_names is None:
label_names = [f'类别 {i}' for i in range(len(np.unique(labels)))]
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 1. 类别分布
unique_labels, counts = np.unique(labels, return_counts=True)
axes[0, 0].bar([label_names[i] for i in unique_labels], counts)
axes[0, 0].set_title('类别分布')
axes[0, 0].set_xlabel('类别')
axes[0, 0].set_ylabel('样本数量')
# 2. 文本长度分布
text_lengths = [len(text.split()) for text in texts]
axes[0, 1].hist(text_lengths, bins=50, alpha=0.7)
axes[0, 1].set_title('文本长度分布')
axes[0, 1].set_xlabel('单词数量')
axes[0, 1].set_ylabel('频次')
axes[0, 1].axvline(np.mean(text_lengths), color='red', linestyle='--',
label=f'平均长度: {np.mean(text_lengths):.1f}')
axes[0, 1].legend()
# 3. 各类别文本长度分布
for i, label in enumerate(unique_labels):
label_texts = [texts[j] for j in range(len(texts)) if labels[j] == label]
label_lengths = [len(text.split()) for text in label_texts]
axes[1, 0].hist(label_lengths, bins=30, alpha=0.7,
label=label_names[label])
axes[1, 0].set_title('各类别文本长度分布')
axes[1, 0].set_xlabel('单词数量')
axes[1, 0].set_ylabel('频次')
axes[1, 0].legend()
# 4. 词频统计
all_words = ' '.join(texts).split()
word_freq = Counter(all_words)
top_words = word_freq.most_common(20)
words, freqs = zip(*top_words)
axes[1, 1].barh(range(len(words)), freqs)
axes[1, 1].set_yticks(range(len(words)))
axes[1, 1].set_yticklabels(words)
axes[1, 1].set_title('Top 20 高频词')
axes[1, 1].set_xlabel('频次')
plt.tight_layout()
plt.show()
# 可视化IMDB数据
imdb_texts = [decode_review(x_train[i], reverse_word_index) for i in range(1000)]
imdb_labels = y_train[:1000]
visualize_text_data(imdb_texts, imdb_labels, ['负面', '正面'])文本向量化
词袋模型和TF-IDF
python
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
def create_bow_features(texts, max_features=10000):
"""
创建词袋模型特征
"""
vectorizer = CountVectorizer(
max_features=max_features,
stop_words='english',
ngram_range=(1, 2) # 包含1-gram和2-gram
)
features = vectorizer.fit_transform(texts)
feature_names = vectorizer.get_feature_names_out()
return features, vectorizer, feature_names
def create_tfidf_features(texts, max_features=10000):
"""
创建TF-IDF特征
"""
vectorizer = TfidfVectorizer(
max_features=max_features,
stop_words='english',
ngram_range=(1, 2),
min_df=2, # 忽略出现次数少于2的词
max_df=0.95 # 忽略出现在95%以上文档中的词
)
features = vectorizer.fit_transform(texts)
feature_names = vectorizer.get_feature_names_out()
return features, vectorizer, feature_names
# 示例使用
sample_texts = [decode_review(x_train[i], reverse_word_index) for i in range(100)]
bow_features, bow_vectorizer, bow_names = create_bow_features(sample_texts)
tfidf_features, tfidf_vectorizer, tfidf_names = create_tfidf_features(sample_texts)
print(f"词袋模型特征形状: {bow_features.shape}")
print(f"TF-IDF特征形状: {tfidf_features.shape}")序列化和填充
python
def create_sequences(texts, tokenizer=None, max_words=10000, max_len=500):
"""
将文本转换为序列
"""
if tokenizer is None:
tokenizer = keras.preprocessing.text.Tokenizer(
num_words=max_words,
oov_token="<OOV>"
)
tokenizer.fit_on_texts(texts)
sequences = tokenizer.texts_to_sequences(texts)
padded_sequences = keras.preprocessing.sequence.pad_sequences(
sequences, maxlen=max_len, padding='post', truncating='post'
)
return padded_sequences, tokenizer
def analyze_sequence_lengths(sequences):
"""
分析序列长度分布
"""
lengths = [len(seq) for seq in sequences]
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.hist(lengths, bins=50, alpha=0.7)
plt.axvline(np.mean(lengths), color='red', linestyle='--',
label=f'平均长度: {np.mean(lengths):.1f}')
plt.axvline(np.percentile(lengths, 95), color='orange', linestyle='--',
label=f'95%分位数: {np.percentile(lengths, 95):.1f}')
plt.title('序列长度分布')
plt.xlabel('序列长度')
plt.ylabel('频次')
plt.legend()
plt.subplot(1, 2, 2)
plt.boxplot(lengths)
plt.title('序列长度箱线图')
plt.ylabel('序列长度')
plt.tight_layout()
plt.show()
print(f"序列长度统计:")
print(f"最小长度: {np.min(lengths)}")
print(f"最大长度: {np.max(lengths)}")
print(f"平均长度: {np.mean(lengths):.2f}")
print(f"中位数长度: {np.median(lengths):.2f}")
print(f"95%分位数: {np.percentile(lengths, 95):.2f}")
# 分析IMDB序列长度
analyze_sequence_lengths([x_train[i] for i in range(1000)])模型构建
基础神经网络模型
python
def create_dense_model(vocab_size, embedding_dim=128, max_length=500, num_classes=2):
"""
创建基于全连接层的文本分类模型
"""
model = keras.Sequential([
keras.layers.Embedding(vocab_size, embedding_dim, input_length=max_length),
keras.layers.GlobalAveragePooling1D(),
keras.layers.Dense(128, activation='relu'),
keras.layers.Dropout(0.5),
keras.layers.Dense(64, activation='relu'),
keras.layers.Dropout(0.5),
keras.layers.Dense(num_classes, activation='softmax' if num_classes > 2 else 'sigmoid')
])
return model
def create_cnn_model(vocab_size, embedding_dim=128, max_length=500, num_classes=2):
"""
创建CNN文本分类模型
"""
model = keras.Sequential([
keras.layers.Embedding(vocab_size, embedding_dim, input_length=max_length),
# 多个卷积核
keras.layers.Conv1D(128, 3, activation='relu'),
keras.layers.GlobalMaxPooling1D(),
keras.layers.Dropout(0.5),
keras.layers.Dense(128, activation='relu'),
keras.layers.Dropout(0.5),
keras.layers.Dense(num_classes, activation='softmax' if num_classes > 2 else 'sigmoid')
])
return model
def create_multi_cnn_model(vocab_size, embedding_dim=128, max_length=500, num_classes=2):
"""
创建多尺度CNN模型
"""
# 输入层
inputs = keras.layers.Input(shape=(max_length,))
# 嵌入层
embedding = keras.layers.Embedding(vocab_size, embedding_dim)(inputs)
# 多个不同尺寸的卷积核
conv_blocks = []
filter_sizes = [3, 4, 5]
for filter_size in filter_sizes:
conv = keras.layers.Conv1D(128, filter_size, activation='relu')(embedding)
pool = keras.layers.GlobalMaxPooling1D()(conv)
conv_blocks.append(pool)
# 合并所有卷积块
merged = keras.layers.Concatenate()(conv_blocks)
# 全连接层
dense = keras.layers.Dense(128, activation='relu')(merged)
dropout = keras.layers.Dropout(0.5)(dense)
outputs = keras.layers.Dense(num_classes,
activation='softmax' if num_classes > 2 else 'sigmoid')(dropout)
model = keras.Model(inputs=inputs, outputs=outputs)
return model
# 创建模型
vocab_size = 10000
embedding_dim = 128
max_length = 500
num_classes = 2
dense_model = create_dense_model(vocab_size, embedding_dim, max_length, num_classes)
cnn_model = create_cnn_model(vocab_size, embedding_dim, max_length, num_classes)
multi_cnn_model = create_multi_cnn_model(vocab_size, embedding_dim, max_length, num_classes)
print("Dense模型结构:")
dense_model.summary()RNN和LSTM模型
python
def create_lstm_model(vocab_size, embedding_dim=128, max_length=500, num_classes=2):
"""
创建LSTM文本分类模型
"""
model = keras.Sequential([
keras.layers.Embedding(vocab_size, embedding_dim, input_length=max_length),
keras.layers.LSTM(128, dropout=0.5, recurrent_dropout=0.5),
keras.layers.Dense(64, activation='relu'),
keras.layers.Dropout(0.5),
keras.layers.Dense(num_classes, activation='softmax' if num_classes > 2 else 'sigmoid')
])
return model
def create_bidirectional_lstm_model(vocab_size, embedding_dim=128, max_length=500, num_classes=2):
"""
创建双向LSTM模型
"""
model = keras.Sequential([
keras.layers.Embedding(vocab_size, embedding_dim, input_length=max_length),
keras.layers.Bidirectional(keras.layers.LSTM(64, dropout=0.5, recurrent_dropout=0.5)),
keras.layers.Dense(64, activation='relu'),
keras.layers.Dropout(0.5),
keras.layers.Dense(num_classes, activation='softmax' if num_classes > 2 else 'sigmoid')
])
return model
def create_hierarchical_attention_model(vocab_size, embedding_dim=128, max_length=500, num_classes=2):
"""
创建带注意力机制的分层模型
"""
# 注意力层
class AttentionLayer(keras.layers.Layer):
def __init__(self, attention_dim):
super(AttentionLayer, self).__init__()
self.attention_dim = attention_dim
self.W = keras.layers.Dense(attention_dim)
self.V = keras.layers.Dense(1)
def call(self, inputs):
# inputs shape: (batch_size, time_steps, features)
score = self.V(tf.nn.tanh(self.W(inputs)))
attention_weights = tf.nn.softmax(score, axis=1)
context_vector = attention_weights * inputs
context_vector = tf.reduce_sum(context_vector, axis=1)
return context_vector
inputs = keras.layers.Input(shape=(max_length,))
# 嵌入层
embedding = keras.layers.Embedding(vocab_size, embedding_dim)(inputs)
# 双向LSTM
lstm_out = keras.layers.Bidirectional(
keras.layers.LSTM(64, return_sequences=True, dropout=0.5, recurrent_dropout=0.5)
)(embedding)
# 注意力层
attention_out = AttentionLayer(64)(lstm_out)
# 分类层
dense = keras.layers.Dense(64, activation='relu')(attention_out)
dropout = keras.layers.Dropout(0.5)(dense)
outputs = keras.layers.Dense(num_classes,
activation='softmax' if num_classes > 2 else 'sigmoid')(dropout)
model = keras.Model(inputs=inputs, outputs=outputs)
return model
# 创建RNN模型
lstm_model = create_lstm_model(vocab_size, embedding_dim, max_length, num_classes)
bilstm_model = create_bidirectional_lstm_model(vocab_size, embedding_dim, max_length, num_classes)
attention_model = create_hierarchical_attention_model(vocab_size, embedding_dim, max_length, num_classes)
print("LSTM模型结构:")
lstm_model.summary()Transformer模型
python
def create_transformer_classifier(vocab_size, embedding_dim=128, max_length=500,
num_heads=8, ff_dim=512, num_classes=2):
"""
创建Transformer文本分类模型
"""
inputs = keras.layers.Input(shape=(max_length,))
# 嵌入层
embedding = keras.layers.Embedding(vocab_size, embedding_dim)(inputs)
# 位置编码
positions = tf.range(start=0, limit=max_length, delta=1)
position_embedding = keras.layers.Embedding(max_length, embedding_dim)(positions)
x = embedding + position_embedding
# Transformer编码器
attention_output = keras.layers.MultiHeadAttention(
num_heads=num_heads, key_dim=embedding_dim
)(x, x)
# 残差连接和层归一化
x = keras.layers.LayerNormalization()(x + attention_output)
# 前馈网络
ffn_output = keras.layers.Dense(ff_dim, activation='relu')(x)
ffn_output = keras.layers.Dense(embedding_dim)(ffn_output)
# 残差连接和层归一化
x = keras.layers.LayerNormalization()(x + ffn_output)
# 全局平均池化
x = keras.layers.GlobalAveragePooling1D()(x)
# 分类层
x = keras.layers.Dropout(0.1)(x)
outputs = keras.layers.Dense(num_classes,
activation='softmax' if num_classes > 2 else 'sigmoid')(x)
model = keras.Model(inputs=inputs, outputs=outputs)
return model
# 创建Transformer模型
transformer_model = create_transformer_classifier(vocab_size, embedding_dim, max_length, num_classes)
print("Transformer模型结构:")
transformer_model.summary()预训练词嵌入
使用GloVe词嵌入
python
def load_glove_embeddings(glove_file, word_index, embedding_dim=100):
"""
加载GloVe预训练词嵌入
"""
embeddings_index = {}
with open(glove_file, 'r', encoding='utf-8') as f:
for line in f:
values = line.split()
word = values[0]
coefs = np.asarray(values[1:], dtype='float32')
embeddings_index[word] = coefs
print(f'找到 {len(embeddings_index)} 个词向量')
# 创建嵌入矩阵
vocab_size = len(word_index) + 1
embedding_matrix = np.zeros((vocab_size, embedding_dim))
for word, i in word_index.items():
if i < vocab_size:
embedding_vector = embeddings_index.get(word)
if embedding_vector is not None:
embedding_matrix[i] = embedding_vector
return embedding_matrix
def create_model_with_pretrained_embeddings(vocab_size, embedding_matrix,
max_length=500, num_classes=2):
"""
创建使用预训练词嵌入的模型
"""
embedding_dim = embedding_matrix.shape[1]
model = keras.Sequential([
keras.layers.Embedding(
vocab_size,
embedding_dim,
weights=[embedding_matrix],
input_length=max_length,
trainable=False # 冻结预训练嵌入
),
keras.layers.Bidirectional(keras.layers.LSTM(64, dropout=0.5, recurrent_dropout=0.5)),
keras.layers.Dense(64, activation='relu'),
keras.layers.Dropout(0.5),
keras.layers.Dense(num_classes, activation='softmax' if num_classes > 2 else 'sigmoid')
])
return model
# 使用示例(需要下载GloVe文件)
# glove_file = 'glove.6B.100d.txt'
# embedding_matrix = load_glove_embeddings(glove_file, word_index)
# pretrained_model = create_model_with_pretrained_embeddings(vocab_size, embedding_matrix, max_length, num_classes)模型训练
数据准备和训练
python
def prepare_imdb_data(max_words=10000, max_len=500):
"""
准备IMDB数据用于训练
"""
# 加载数据
(x_train, y_train), (x_test, y_test) = keras.datasets.imdb.load_data(num_words=max_words)
# 填充序列
x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=max_len)
x_test = keras.preprocessing.sequence.pad_sequences(x_test, maxlen=max_len)
return (x_train, y_train), (x_test, y_test)
def compile_and_train_model(model, x_train, y_train, x_val, y_val,
epochs=10, batch_size=32, model_name='text_classifier'):
"""
编译和训练模型
"""
# 编译模型
if len(np.unique(y_train)) > 2:
loss = 'sparse_categorical_crossentropy'
metrics = ['accuracy']
else:
loss = 'binary_crossentropy'
metrics = ['accuracy']
model.compile(
optimizer='adam',
loss=loss,
metrics=metrics
)
# 回调函数
callbacks = [
keras.callbacks.EarlyStopping(
monitor='val_accuracy',
patience=3,
restore_best_weights=True
),
keras.callbacks.ReduceLROnPlateau(
monitor='val_loss',
factor=0.2,
patience=2,
min_lr=1e-6
),
keras.callbacks.ModelCheckpoint(
f'best_{model_name}.h5',
monitor='val_accuracy',
save_best_only=True
)
]
# 训练模型
history = model.fit(
x_train, y_train,
batch_size=batch_size,
epochs=epochs,
validation_data=(x_val, y_val),
callbacks=callbacks,
verbose=1
)
return history
def plot_training_history(history):
"""
绘制训练历史
"""
fig, axes = plt.subplots(1, 2, figsize=(15, 5))
# 损失
axes[0].plot(history.history['loss'], label='训练损失')
axes[0].plot(history.history['val_loss'], label='验证损失')
axes[0].set_title('模型损失')
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('损失')
axes[0].legend()
# 准确率
axes[1].plot(history.history['accuracy'], label='训练准确率')
axes[1].plot(history.history['val_accuracy'], label='验证准确率')
axes[1].set_title('模型准确率')
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('准确率')
axes[1].legend()
plt.tight_layout()
plt.show()
# 准备数据并训练
(x_train, y_train), (x_test, y_test) = prepare_imdb_data()
# 分割验证集
x_train, x_val, y_train, y_val = train_test_split(
x_train, y_train, test_size=0.2, random_state=42
)
print(f"训练集大小: {x_train.shape}")
print(f"验证集大小: {x_val.shape}")
print(f"测试集大小: {x_test.shape}")
# 训练LSTM模型
print("训练LSTM模型...")
history = compile_and_train_model(
lstm_model, x_train, y_train, x_val, y_val,
epochs=10, model_name='lstm_classifier'
)
plot_training_history(history)模型评估
性能评估
python
def evaluate_text_classifier(model, x_test, y_test, class_names=None):
"""
评估文本分类器性能
"""
# 预测
y_pred_proba = model.predict(x_test)
if len(y_pred_proba.shape) > 1 and y_pred_proba.shape[1] > 1:
y_pred = np.argmax(y_pred_proba, axis=1)
else:
y_pred = (y_pred_proba > 0.5).astype(int).flatten()
# 计算指标
test_loss, test_accuracy = model.evaluate(x_test, y_test, verbose=0)
print(f"测试损失: {test_loss:.4f}")
print(f"测试准确率: {test_accuracy:.4f}")
# 分类报告
if class_names is None:
class_names = [f'类别 {i}' for i in range(len(np.unique(y_test)))]
print("\n分类报告:")
print(classification_report(y_test, y_pred, target_names=class_names))
return y_pred, y_pred_proba
def plot_confusion_matrix_text(y_true, y_pred, class_names):
"""
绘制文本分类混淆矩阵
"""
cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=class_names, yticklabels=class_names)
plt.title('混淆矩阵')
plt.xlabel('预测类别')
plt.ylabel('真实类别')
plt.show()
def analyze_prediction_confidence(y_pred_proba, y_true, y_pred):
"""
分析预测置信度
"""
if len(y_pred_proba.shape) > 1 and y_pred_proba.shape[1] > 1:
confidences = np.max(y_pred_proba, axis=1)
else:
confidences = np.maximum(y_pred_proba.flatten(), 1 - y_pred_proba.flatten())
correct_mask = (y_true == y_pred)
plt.figure(figsize=(12, 5))
# 置信度分布
plt.subplot(1, 2, 1)
plt.hist(confidences[correct_mask], bins=30, alpha=0.7, label='正确预测', color='green')
plt.hist(confidences[~correct_mask], bins=30, alpha=0.7, label='错误预测', color='red')
plt.xlabel('预测置信度')
plt.ylabel('频次')
plt.title('预测置信度分布')
plt.legend()
# 置信度vs准确率
plt.subplot(1, 2, 2)
confidence_bins = np.linspace(0.5, 1, 11)
bin_accuracies = []
for i in range(len(confidence_bins) - 1):
mask = (confidences >= confidence_bins[i]) & (confidences < confidence_bins[i + 1])
if np.sum(mask) > 0:
accuracy = np.mean(correct_mask[mask])
bin_accuracies.append(accuracy)
else:
bin_accuracies.append(0)
bin_centers = (confidence_bins[:-1] + confidence_bins[1:]) / 2
plt.bar(bin_centers, bin_accuracies, width=0.04, alpha=0.7)
plt.xlabel('置信度区间')
plt.ylabel('准确率')
plt.title('置信度vs准确率')
plt.tight_layout()
plt.show()
# 评估模型
class_names = ['负面', '正面']
y_pred, y_pred_proba = evaluate_text_classifier(lstm_model, x_test, y_test, class_names)
plot_confusion_matrix_text(y_test, y_pred, class_names)
analyze_prediction_confidence(y_pred_proba, y_test, y_pred)错误分析
python
def analyze_misclassified_examples(x_test, y_test, y_pred, y_pred_proba,
reverse_word_index, num_examples=10):
"""
分析错误分类的样本
"""
# 找出错误分类的样本
incorrect_indices = np.where(y_test != y_pred)[0]
# 按置信度排序,选择高置信度但错误的预测
if len(y_pred_proba.shape) > 1 and y_pred_proba.shape[1] > 1:
confidences = np.max(y_pred_proba, axis=1)
else:
confidences = np.maximum(y_pred_proba.flatten(), 1 - y_pred_proba.flatten())
incorrect_confidences = confidences[incorrect_indices]
sorted_indices = incorrect_indices[np.argsort(incorrect_confidences)[::-1]]
print("高置信度错误预测样本:")
print("=" * 80)
for i, idx in enumerate(sorted_indices[:num_examples]):
text = decode_review(x_test[idx], reverse_word_index)
true_label = '正面' if y_test[idx] == 1 else '负面'
pred_label = '正面' if y_pred[idx] == 1 else '负面'
confidence = confidences[idx]
print(f"\n样本 {i+1}:")
print(f"真实标签: {true_label}")
print(f"预测标签: {pred_label}")
print(f"置信度: {confidence:.3f}")
print(f"文本: {text[:200]}...")
print("-" * 80)
def find_important_words(model, tokenizer, text, class_index=1, num_words=10):
"""
找出对预测最重要的词汇(简单的梯度方法)
"""
# 将文本转换为序列
sequence = tokenizer.texts_to_sequences([text])
padded_sequence = keras.preprocessing.sequence.pad_sequences(sequence, maxlen=500)
# 获取嵌入层权重
embedding_layer = model.layers[0]
with tf.GradientTape() as tape:
# 获取嵌入
embeddings = embedding_layer(padded_sequence)
tape.watch(embeddings)
# 前向传播
predictions = model(padded_sequence)
# 获取目标类别的预测值
if len(predictions.shape) > 1 and predictions.shape[1] > 1:
target_output = predictions[:, class_index]
else:
target_output = predictions[:, 0] if class_index == 1 else 1 - predictions[:, 0]
# 计算梯度
gradients = tape.gradient(target_output, embeddings)
# 计算每个词的重要性(梯度的L2范数)
word_importance = tf.norm(gradients, axis=-1).numpy()[0]
# 获取词汇
words = []
for token_id in padded_sequence[0]:
if token_id > 0: # 忽略填充
word = tokenizer.index_word.get(token_id, '<UNK>')
words.append(word)
else:
words.append('<PAD>')
# 找出最重要的词
word_scores = list(zip(words, word_importance))
word_scores = [(word, score) for word, score in word_scores if word not in ['<PAD>', '<UNK>']]
word_scores.sort(key=lambda x: x[1], reverse=True)
return word_scores[:num_words]
# 错误分析
analyze_misclassified_examples(x_test, y_test, y_pred, y_pred_proba, reverse_word_index)模型解释和可视化
注意力可视化
python
def visualize_attention_weights(model, text, tokenizer, max_len=500):
"""
可视化注意力权重(适用于带注意力机制的模型)
"""
# 预处理文本
sequence = tokenizer.texts_to_sequences([text])
padded_sequence = keras.preprocessing.sequence.pad_sequences(sequence, maxlen=max_len)
# 获取词汇
words = []
for token_id in padded_sequence[0]:
if token_id > 0:
word = tokenizer.index_word.get(token_id, '<UNK>')
words.append(word)
else:
words.append('<PAD>')
# 这里需要修改模型以输出注意力权重
# 示例:假设模型有attention_weights输出
try:
predictions, attention_weights = model.predict(padded_sequence)
# 可视化注意力权重
plt.figure(figsize=(15, 8))
# 只显示非填充的词
non_pad_indices = [i for i, word in enumerate(words) if word != '<PAD>']
display_words = [words[i] for i in non_pad_indices]
display_weights = attention_weights[0][non_pad_indices]
# 创建热力图
plt.imshow(display_weights.reshape(1, -1), cmap='Blues', aspect='auto')
plt.colorbar()
plt.xticks(range(len(display_words)), display_words, rotation=45, ha='right')
plt.yticks([])
plt.title('注意力权重可视化')
plt.tight_layout()
plt.show()
except:
print("该模型不支持注意力权重可视化")
def create_word_cloud(texts, labels, class_index=1):
"""
创建词云图
"""
from wordcloud import WordCloud
# 筛选特定类别的文本
class_texts = [texts[i] for i in range(len(texts)) if labels[i] == class_index]
combined_text = ' '.join(class_texts)
# 创建词云
wordcloud = WordCloud(
width=800,
height=400,
background_color='white',
max_words=100,
colormap='viridis'
).generate(combined_text)
plt.figure(figsize=(12, 6))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.title(f'类别 {class_index} 词云图')
plt.show()
# 可视化示例
sample_texts = [decode_review(x_test[i], reverse_word_index) for i in range(100)]
sample_labels = y_test[:100]
# 创建正面和负面评论的词云
create_word_cloud(sample_texts, sample_labels, class_index=1) # 正面
create_word_cloud(sample_texts, sample_labels, class_index=0) # 负面模型部署
保存和加载模型
python
def save_text_classifier(model, tokenizer, model_path):
"""
保存文本分类模型和分词器
"""
# 保存模型
model.save(f'{model_path}.h5')
# 保存分词器
import pickle
with open(f'{model_path}_tokenizer.pickle', 'wb') as handle:
pickle.dump(tokenizer, handle, protocol=pickle.HIGHEST_PROTOCOL)
print(f"模型和分词器已保存到: {model_path}")
def load_text_classifier(model_path):
"""
加载文本分类模型和分词器
"""
import pickle
# 加载模型
model = keras.models.load_model(f'{model_path}.h5')
# 加载分词器
with open(f'{model_path}_tokenizer.pickle', 'rb') as handle:
tokenizer = pickle.load(handle)
return model, tokenizer
def create_prediction_pipeline(model, tokenizer, max_len=500, class_names=None):
"""
创建预测管道
"""
if class_names is None:
class_names = ['负面', '正面']
def predict_text(text):
# 预处理文本
processed_text = preprocess_text(text)
# 转换为序列
sequence = tokenizer.texts_to_sequences([processed_text])
padded_sequence = keras.preprocessing.sequence.pad_sequences(sequence, maxlen=max_len)
# 预测
prediction = model.predict(padded_sequence)[0]
if len(prediction) > 1:
# 多类分类
predicted_class_idx = np.argmax(prediction)
confidence = float(prediction[predicted_class_idx])
predicted_class = class_names[predicted_class_idx]
# 所有类别的概率
all_probabilities = {class_names[i]: float(prediction[i])
for i in range(len(class_names))}
else:
# 二分类
confidence = float(prediction[0])
predicted_class = class_names[1] if confidence > 0.5 else class_names[0]
all_probabilities = {
class_names[0]: 1 - confidence,
class_names[1]: confidence
}
return {
'predicted_class': predicted_class,
'confidence': confidence,
'all_probabilities': all_probabilities,
'processed_text': processed_text
}
return predict_text
# 创建预测管道
# 首先需要创建分词器(使用训练数据)
sample_texts = [decode_review(x_train[i], reverse_word_index) for i in range(1000)]
_, tokenizer = create_sequences(sample_texts, max_words=10000, max_len=500)
# 保存模型
save_text_classifier(lstm_model, tokenizer, 'sentiment_classifier')
# 创建预测函数
predict_sentiment = create_prediction_pipeline(lstm_model, tokenizer, class_names=['负面', '正面'])
# 测试预测
test_texts = [
"This movie is absolutely fantastic! I loved every minute of it.",
"Terrible film, waste of time and money. Very disappointing.",
"The movie was okay, nothing special but not bad either."
]
for text in test_texts:
result = predict_sentiment(text)
print(f"文本: {text}")
print(f"预测: {result['predicted_class']} (置信度: {result['confidence']:.3f})")
print(f"所有概率: {result['all_probabilities']}")
print("-" * 50)Web API部署
python
def create_text_classification_api(model, tokenizer, class_names):
"""
创建文本分类Web API
"""
from flask import Flask, request, jsonify
import json
app = Flask(__name__)
predict_fn = create_prediction_pipeline(model, tokenizer, class_names=class_names)
@app.route('/predict', methods=['POST'])
def predict():
try:
data = request.get_json()
if 'text' not in data:
return jsonify({'error': '缺少text字段'}), 400
text = data['text']
result = predict_fn(text)
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/batch_predict', methods=['POST'])
def batch_predict():
try:
data = request.get_json()
if 'texts' not in data:
return jsonify({'error': '缺少texts字段'}), 400
texts = data['texts']
results = []
for text in texts:
result = predict_fn(text)
results.append(result)
return jsonify({'results': results})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/health', methods=['GET'])
def health():
return jsonify({'status': 'healthy'})
return app
# 创建API
# api_app = create_text_classification_api(lstm_model, tokenizer, ['负面', '正面'])
# api_app.run(host='0.0.0.0', port=5000, debug=True)总结
本章通过一个完整的文本分类项目,展示了自然语言处理的完整流程:
关键要点:
- 文本预处理:清洗、分词、序列化
- 特征工程:词袋模型、TF-IDF、词嵌入
- 模型架构:从简单神经网络到Transformer
- 训练优化:数据增强、正则化、超参数调优
- 模型评估:多维度性能分析和错误分析
- 模型解释:注意力可视化、重要词汇分析
- 部署应用:API服务和批量预测
最佳实践:
- 充分理解文本数据特征
- 选择合适的预处理策略
- 尝试多种模型架构
- 使用预训练词嵌入提升性能
- 进行详细的错误分析
- 考虑模型可解释性
- 设计高效的部署方案
下一章我们将学习时间序列预测,探索序列数据的另一个重要应用领域。