信号处理
SciPy 的 scipy.signal 模块提供了丰富的信号处理功能,包括滤波器设计、频谱分析、信号生成、卷积运算等。无论是处理音频信号、生物医学信号,还是进行通信系统分析,这个模块都提供了强大的工具。本章将详细介绍如何使用 SciPy 进行各种信号处理任务。
scipy.signal 模块概述
scipy.signal 模块包含以下主要功能:
- 信号生成和波形创建
- 数字滤波器设计和应用
- 频谱分析和变换
- 卷积和相关运算
- 窗函数和谱估计
- 系统分析和控制理论
python
import numpy as np
from scipy import signal
import matplotlib.pyplot as plt
from scipy.fft import fft, fftfreq
import warnings
warnings.filterwarnings('ignore')
# 设置绘图样式
plt.style.use('seaborn-v0_8')
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# 查看 signal 模块的主要功能
print("scipy.signal 主要功能:")
functions = [attr for attr in dir(signal) if not attr.startswith('_')]
print(f"总共 {len(functions)} 个函数和类")
print("部分函数:", functions[:20])信号生成
1. 基本波形生成
python
# 时间轴设置
fs = 1000 # 采样频率 (Hz)
T = 2.0 # 信号持续时间 (秒)
t = np.linspace(0, T, int(fs * T), endpoint=False)
# 生成各种基本波形
freq1, freq2, freq3 = 10, 25, 50 # 频率 (Hz)
# 正弦波
sine_wave = np.sin(2 * np.pi * freq1 * t)
# 方波
square_wave = signal.square(2 * np.pi * freq2 * t)
# 锯齿波
sawtooth_wave = signal.sawtooth(2 * np.pi * freq3 * t)
# 三角波
triangle_wave = signal.sawtooth(2 * np.pi * freq2 * t, 0.5)
# 啁啾信号(频率扫描)
chirp_signal = signal.chirp(t, f0=1, f1=100, t1=T, method='linear')
# 高斯脉冲
gaussian_pulse = signal.gausspulse(t - T/2, fc=freq2)
# 可视化基本波形
fig, axes = plt.subplots(3, 2, figsize=(15, 12))
axes = axes.flatten()
waveforms = [
(sine_wave, f'正弦波 ({freq1} Hz)'),
(square_wave, f'方波 ({freq2} Hz)'),
(sawtooth_wave, f'锯齿波 ({freq3} Hz)'),
(triangle_wave, f'三角波 ({freq2} Hz)'),
(chirp_signal, '啁啾信号 (1-100 Hz)'),
(gaussian_pulse, f'高斯脉冲 ({freq2} Hz)')
]
for i, (wave, title) in enumerate(waveforms):
axes[i].plot(t[:500], wave[:500], 'b-', linewidth=1.5) # 只显示前0.5秒
axes[i].set_title(title)
axes[i].set_xlabel('时间 (s)')
axes[i].set_ylabel('幅度')
axes[i].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("基本波形生成完成")
print(f"采样频率: {fs} Hz")
print(f"信号长度: {len(t)} 个采样点")
print(f"时间分辨率: {1/fs:.3f} s")2. 复合信号和噪声
python
# 创建复合信号
np.random.seed(42)
# 多频率正弦波叠加
composite_signal = (np.sin(2 * np.pi * 5 * t) +
0.5 * np.sin(2 * np.pi * 15 * t) +
0.3 * np.sin(2 * np.pi * 30 * t))
# 添加噪声
noise_level = 0.2
noise = np.random.normal(0, noise_level, len(t))
noisy_signal = composite_signal + noise
# AM调制信号
carrier_freq = 100 # 载波频率
modulation_freq = 5 # 调制频率
modulation_depth = 0.5
am_signal = (1 + modulation_depth * np.sin(2 * np.pi * modulation_freq * t)) * \
np.sin(2 * np.pi * carrier_freq * t)
# FM调制信号
frequency_deviation = 20 # 频率偏移
fm_signal = np.sin(2 * np.pi * carrier_freq * t +
frequency_deviation * np.sin(2 * np.pi * modulation_freq * t))
# 可视化复合信号
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 复合信号
axes[0, 0].plot(t[:1000], composite_signal[:1000], 'b-', linewidth=1, label='原始信号')
axes[0, 0].plot(t[:1000], noisy_signal[:1000], 'r-', alpha=0.7, linewidth=1, label='含噪信号')
axes[0, 0].set_title('复合信号 (5Hz + 15Hz + 30Hz)')
axes[0, 0].set_xlabel('时间 (s)')
axes[0, 0].set_ylabel('幅度')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# AM调制
axes[0, 1].plot(t[:2000], am_signal[:2000], 'g-', linewidth=1)
axes[0, 1].set_title(f'AM调制信号 (载波: {carrier_freq}Hz, 调制: {modulation_freq}Hz)')
axes[0, 1].set_xlabel('时间 (s)')
axes[0, 1].set_ylabel('幅度')
axes[0, 1].grid(True, alpha=0.3)
# FM调制
axes[1, 0].plot(t[:2000], fm_signal[:2000], 'm-', linewidth=1)
axes[1, 0].set_title(f'FM调制信号 (载波: {carrier_freq}Hz, 调制: {modulation_freq}Hz)')
axes[1, 0].set_xlabel('时间 (s)')
axes[1, 0].set_ylabel('幅度')
axes[1, 0].grid(True, alpha=0.3)
# 噪声特性
axes[1, 1].hist(noise, bins=50, alpha=0.7, density=True, edgecolor='black')
axes[1, 1].set_title(f'噪声分布 (σ = {noise_level})')
axes[1, 1].set_xlabel('幅度')
axes[1, 1].set_ylabel('概率密度')
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()频谱分析
1. 傅里叶变换
python
# 计算频谱
def compute_spectrum(signal_data, sampling_rate):
"""计算信号的频谱"""
N = len(signal_data)
frequencies = fftfreq(N, 1/sampling_rate)[:N//2]
spectrum = fft(signal_data)[:N//2]
magnitude = np.abs(spectrum)
phase = np.angle(spectrum)
power = magnitude**2
return frequencies, magnitude, phase, power
# 分析不同信号的频谱
signals_to_analyze = [
(composite_signal, '复合信号'),
(noisy_signal, '含噪信号'),
(am_signal, 'AM调制信号'),
(fm_signal, 'FM调制信号')
]
fig, axes = plt.subplots(4, 2, figsize=(15, 16))
for i, (sig, name) in enumerate(signals_to_analyze):
freqs, mag, phase, power = compute_spectrum(sig, fs)
# 时域信号
axes[i, 0].plot(t[:1000], sig[:1000], 'b-', linewidth=1)
axes[i, 0].set_title(f'{name} - 时域')
axes[i, 0].set_xlabel('时间 (s)')
axes[i, 0].set_ylabel('幅度')
axes[i, 0].grid(True, alpha=0.3)
# 频域信号
axes[i, 1].plot(freqs, mag, 'r-', linewidth=1)
axes[i, 1].set_title(f'{name} - 频域')
axes[i, 1].set_xlabel('频率 (Hz)')
axes[i, 1].set_ylabel('幅度')
axes[i, 1].set_xlim(0, 150) # 只显示0-150Hz
axes[i, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 频谱特征分析
print("\n频谱分析结果:")
for sig, name in signals_to_analyze:
freqs, mag, _, power = compute_spectrum(sig, fs)
# 找到主要频率成分
peak_indices = signal.find_peaks(mag, height=np.max(mag)*0.1)[0]
peak_freqs = freqs[peak_indices]
peak_mags = mag[peak_indices]
print(f"\n{name}:")
print(f" 主要频率成分: {peak_freqs[:5]} Hz")
print(f" 对应幅度: {peak_mags[:5]}")
print(f" 总功率: {np.sum(power):.2f}")2. 短时傅里叶变换 (STFT)
python
# 创建时变信号
t_stft = np.linspace(0, 4, 4000)
time_varying_signal = np.zeros_like(t_stft)
# 分段频率
for i, time_point in enumerate(t_stft):
if time_point < 1:
freq = 10
elif time_point < 2:
freq = 25
elif time_point < 3:
freq = 50
else:
freq = 75
time_varying_signal[i] = np.sin(2 * np.pi * freq * time_point)
# 添加噪声
time_varying_signal += 0.1 * np.random.normal(0, 1, len(t_stft))
# 计算短时傅里叶变换
f_stft, t_stft_axis, Zxx = signal.stft(time_varying_signal, fs=1000,
window='hann', nperseg=256, noverlap=128)
# 可视化STFT结果
fig, axes = plt.subplots(3, 1, figsize=(15, 12))
# 原始信号
axes[0].plot(t_stft, time_varying_signal, 'b-', linewidth=1)
axes[0].set_title('时变信号 (频率: 10→25→50→75 Hz)')
axes[0].set_xlabel('时间 (s)')
axes[0].set_ylabel('幅度')
axes[0].grid(True, alpha=0.3)
# STFT幅度谱
im1 = axes[1].pcolormesh(t_stft_axis, f_stft, np.abs(Zxx), shading='gouraud', cmap='viridis')
axes[1].set_title('短时傅里叶变换 - 幅度谱')
axes[1].set_xlabel('时间 (s)')
axes[1].set_ylabel('频率 (Hz)')
axes[1].set_ylim(0, 100)
plt.colorbar(im1, ax=axes[1], label='幅度')
# STFT相位谱
im2 = axes[2].pcolormesh(t_stft_axis, f_stft, np.angle(Zxx), shading='gouraud', cmap='hsv')
axes[2].set_title('短时傅里叶变换 - 相位谱')
axes[2].set_xlabel('时间 (s)')
axes[2].set_ylabel('频率 (Hz)')
axes[2].set_ylim(0, 100)
plt.colorbar(im2, ax=axes[2], label='相位 (弧度)')
plt.tight_layout()
plt.show()
print("\n短时傅里叶变换分析:")
print(f"时间分辨率: {t_stft_axis[1] - t_stft_axis[0]:.3f} s")
print(f"频率分辨率: {f_stft[1] - f_stft[0]:.3f} Hz")
print(f"窗长度: 256 个采样点")
print(f"重叠: 128 个采样点 (50%)")数字滤波器
1. IIR滤波器设计
python
# 创建测试信号
fs = 1000
t = np.linspace(0, 2, 2000, endpoint=False)
# 多频率信号
signal_clean = (np.sin(2*np.pi*5*t) + # 5 Hz - 保留
0.5*np.sin(2*np.pi*25*t) + # 25 Hz - 保留
0.3*np.sin(2*np.pi*60*t) + # 60 Hz - 滤除
0.2*np.sin(2*np.pi*120*t)) # 120 Hz - 滤除
# 添加噪声
noise = 0.1 * np.random.normal(0, 1, len(t))
signal_noisy = signal_clean + noise
# 设计不同类型的IIR滤波器
nyquist = fs / 2
# 1. 低通滤波器 (截止频率: 30 Hz)
lowpass_cutoff = 30 / nyquist
b_lp, a_lp = signal.butter(4, lowpass_cutoff, btype='low')
# 2. 高通滤波器 (截止频率: 10 Hz)
highpass_cutoff = 10 / nyquist
b_hp, a_hp = signal.butter(4, highpass_cutoff, btype='high')
# 3. 带通滤波器 (通带: 20-40 Hz)
bandpass_low = 20 / nyquist
bandpass_high = 40 / nyquist
b_bp, a_bp = signal.butter(4, [bandpass_low, bandpass_high], btype='band')
# 4. 带阻滤波器 (阻带: 55-65 Hz)
bandstop_low = 55 / nyquist
bandstop_high = 65 / nyquist
b_bs, a_bs = signal.butter(4, [bandstop_low, bandstop_high], btype='bandstop')
# 应用滤波器
filtered_lp = signal.filtfilt(b_lp, a_lp, signal_noisy)
filtered_hp = signal.filtfilt(b_hp, a_hp, signal_noisy)
filtered_bp = signal.filtfilt(b_bp, a_bp, signal_noisy)
filtered_bs = signal.filtfilt(b_bs, a_bs, signal_noisy)
# 可视化滤波结果
fig, axes = plt.subplots(3, 2, figsize=(15, 12))
# 原始信号和含噪信号
axes[0, 0].plot(t[:1000], signal_clean[:1000], 'g-', linewidth=2, label='原始信号')
axes[0, 0].plot(t[:1000], signal_noisy[:1000], 'r-', alpha=0.7, linewidth=1, label='含噪信号')
axes[0, 0].set_title('原始信号 vs 含噪信号')
axes[0, 0].set_xlabel('时间 (s)')
axes[0, 0].set_ylabel('幅度')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 低通滤波
axes[0, 1].plot(t[:1000], signal_noisy[:1000], 'r-', alpha=0.5, linewidth=1, label='含噪信号')
axes[0, 1].plot(t[:1000], filtered_lp[:1000], 'b-', linewidth=2, label='低通滤波')
axes[0, 1].set_title(f'低通滤波器 (截止: {30} Hz)')
axes[0, 1].set_xlabel('时间 (s)')
axes[0, 1].set_ylabel('幅度')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# 高通滤波
axes[1, 0].plot(t[:1000], signal_noisy[:1000], 'r-', alpha=0.5, linewidth=1, label='含噪信号')
axes[1, 0].plot(t[:1000], filtered_hp[:1000], 'g-', linewidth=2, label='高通滤波')
axes[1, 0].set_title(f'高通滤波器 (截止: {10} Hz)')
axes[1, 0].set_xlabel('时间 (s)')
axes[1, 0].set_ylabel('幅度')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
# 带通滤波
axes[1, 1].plot(t[:1000], signal_noisy[:1000], 'r-', alpha=0.5, linewidth=1, label='含噪信号')
axes[1, 1].plot(t[:1000], filtered_bp[:1000], 'm-', linewidth=2, label='带通滤波')
axes[1, 1].set_title(f'带通滤波器 ({20}-{40} Hz)')
axes[1, 1].set_xlabel('时间 (s)')
axes[1, 1].set_ylabel('幅度')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
# 带阻滤波
axes[2, 0].plot(t[:1000], signal_noisy[:1000], 'r-', alpha=0.5, linewidth=1, label='含噪信号')
axes[2, 0].plot(t[:1000], filtered_bs[:1000], 'c-', linewidth=2, label='带阻滤波')
axes[2, 0].set_title(f'带阻滤波器 ({55}-{65} Hz)')
axes[2, 0].set_xlabel('时间 (s)')
axes[2, 0].set_ylabel('幅度')
axes[2, 0].legend()
axes[2, 0].grid(True, alpha=0.3)
# 频谱比较
freqs = fftfreq(len(signal_noisy), 1/fs)[:len(signal_noisy)//2]
spectrum_original = np.abs(fft(signal_noisy))[:len(signal_noisy)//2]
spectrum_filtered = np.abs(fft(filtered_lp))[:len(signal_noisy)//2]
axes[2, 1].plot(freqs, spectrum_original, 'r-', alpha=0.7, linewidth=1, label='原始频谱')
axes[2, 1].plot(freqs, spectrum_filtered, 'b-', linewidth=2, label='滤波后频谱')
axes[2, 1].set_title('频谱比较 (低通滤波)')
axes[2, 1].set_xlabel('频率 (Hz)')
axes[2, 1].set_ylabel('幅度')
axes[2, 1].set_xlim(0, 150)
axes[2, 1].legend()
axes[2, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()2. FIR滤波器设计
python
# 设计FIR滤波器
from scipy.signal import firwin, freqz
# FIR滤波器参数
filter_length = 101 # 滤波器长度(奇数)
# 设计不同类型的FIR滤波器
# 1. 低通FIR滤波器
fir_lp = firwin(filter_length, 30, fs=fs, pass_zero='lowpass')
# 2. 高通FIR滤波器
fir_hp = firwin(filter_length, 10, fs=fs, pass_zero='highpass')
# 3. 带通FIR滤波器
fir_bp = firwin(filter_length, [20, 40], fs=fs, pass_zero='bandpass')
# 4. 带阻FIR滤波器
fir_bs = firwin(filter_length, [55, 65], fs=fs, pass_zero='bandstop')
# 计算频率响应
filters = [
(fir_lp, '低通FIR', 'blue'),
(fir_hp, '高通FIR', 'green'),
(fir_bp, '带通FIR', 'magenta'),
(fir_bs, '带阻FIR', 'cyan')
]
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
for i, (fir_coeff, name, color) in enumerate(filters):
# 计算频率响应
w, h = freqz(fir_coeff, worN=8000, fs=fs)
# 幅度响应
ax1 = axes[i//2, i%2]
ax1.plot(w, 20 * np.log10(np.abs(h)), color=color, linewidth=2)
ax1.set_title(f'{name} - 幅度响应')
ax1.set_xlabel('频率 (Hz)')
ax1.set_ylabel('幅度 (dB)')
ax1.grid(True, alpha=0.3)
ax1.set_xlim(0, 200)
ax1.set_ylim(-80, 5)
plt.tight_layout()
plt.show()
# 应用FIR滤波器并比较结果
filtered_fir_lp = signal.convolve(signal_noisy, fir_lp, mode='same')
filtered_iir_lp = signal.filtfilt(b_lp, a_lp, signal_noisy)
# 比较FIR和IIR滤波器
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 时域比较
axes[0, 0].plot(t[:1000], signal_noisy[:1000], 'r-', alpha=0.5, linewidth=1, label='原始信号')
axes[0, 0].plot(t[:1000], filtered_fir_lp[:1000], 'b-', linewidth=2, label='FIR滤波')
axes[0, 0].plot(t[:1000], filtered_iir_lp[:1000], 'g--', linewidth=2, label='IIR滤波')
axes[0, 0].set_title('时域比较 - 低通滤波')
axes[0, 0].set_xlabel('时间 (s)')
axes[0, 0].set_ylabel('幅度')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 频域比较
freqs = fftfreq(len(signal_noisy), 1/fs)[:len(signal_noisy)//2]
spectrum_fir = np.abs(fft(filtered_fir_lp))[:len(signal_noisy)//2]
spectrum_iir = np.abs(fft(filtered_iir_lp))[:len(signal_noisy)//2]
axes[0, 1].plot(freqs, spectrum_fir, 'b-', linewidth=2, label='FIR滤波')
axes[0, 1].plot(freqs, spectrum_iir, 'g--', linewidth=2, label='IIR滤波')
axes[0, 1].set_title('频域比较 - 低通滤波')
axes[0, 1].set_xlabel('频率 (Hz)')
axes[0, 1].set_ylabel('幅度')
axes[0, 1].set_xlim(0, 100)
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# 滤波器系数
axes[1, 0].stem(range(len(fir_lp)), fir_lp, basefmt=" ")
axes[1, 0].set_title('FIR滤波器系数')
axes[1, 0].set_xlabel('系数索引')
axes[1, 0].set_ylabel('系数值')
axes[1, 0].grid(True, alpha=0.3)
# 相位响应比较
w_fir, h_fir = freqz(fir_lp, worN=8000, fs=fs)
w_iir, h_iir = freqz(b_lp, a_lp, worN=8000, fs=fs)
axes[1, 1].plot(w_fir, np.angle(h_fir), 'b-', linewidth=2, label='FIR相位')
axes[1, 1].plot(w_iir, np.angle(h_iir), 'g--', linewidth=2, label='IIR相位')
axes[1, 1].set_title('相位响应比较')
axes[1, 1].set_xlabel('频率 (Hz)')
axes[1, 1].set_ylabel('相位 (弧度)')
axes[1, 1].set_xlim(0, 100)
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("\nFIR vs IIR滤波器比较:")
print(f"FIR滤波器长度: {len(fir_lp)} 个系数")
print(f"IIR滤波器阶数: {len(b_lp)-1} (分子) + {len(a_lp)-1} (分母)")
print(f"FIR滤波器特点: 线性相位, 总是稳定")
print(f"IIR滤波器特点: 计算效率高, 可能不稳定")窗函数
1. 常用窗函数
python
# 窗函数长度
N = 256
# 生成各种窗函数
windows = {
'矩形窗': signal.boxcar(N),
'汉宁窗': signal.hann(N),
'汉明窗': signal.hamming(N),
'布莱克曼窗': signal.blackman(N),
'凯泽窗': signal.kaiser(N, beta=8.6),
'巴特利特窗': signal.bartlett(N),
'图基窗': signal.tukey(N, alpha=0.5)
}
# 可视化窗函数
fig, axes = plt.subplots(3, 3, figsize=(15, 12))
axes = axes.flatten()
for i, (name, window) in enumerate(windows.items()):
if i < len(axes):
axes[i].plot(window, 'b-', linewidth=2)
axes[i].set_title(name)
axes[i].set_xlabel('样本索引')
axes[i].set_ylabel('幅度')
axes[i].grid(True, alpha=0.3)
axes[i].set_ylim(0, 1.1)
# 窗函数频谱比较
axes[-2].clear()
for name, window in list(windows.items())[:4]: # 只显示前4个
# 计算频谱
spectrum = np.abs(fft(window, 2048))
freqs = np.linspace(0, 1, len(spectrum)//2)
spectrum_db = 20 * np.log10(spectrum[:len(spectrum)//2] / np.max(spectrum))
axes[-2].plot(freqs, spectrum_db, linewidth=2, label=name)
axes[-2].set_title('窗函数频谱比较')
axes[-2].set_xlabel('归一化频率')
axes[-2].set_ylabel('幅度 (dB)')
axes[-2].set_ylim(-120, 10)
axes[-2].legend()
axes[-2].grid(True, alpha=0.3)
# 窗函数参数比较
axes[-1].clear()
window_params = []
for name, window in windows.items():
# 计算窗函数参数
coherent_gain = np.mean(window)
processing_gain = np.sum(window**2) / len(window)
scalloping_loss = -20 * np.log10(np.mean(window))
window_params.append([name, coherent_gain, processing_gain, scalloping_loss])
# 显示参数表格
axes[-1].axis('tight')
axes[-1].axis('off')
table_data = [[p[0], f'{p[1]:.3f}', f'{p[2]:.3f}', f'{p[3]:.2f}'] for p in window_params]
headers = ['窗函数', '相干增益', '处理增益', '扇贝损失(dB)']
table = axes[-1].table(cellText=table_data, colLabels=headers,
cellLoc='center', loc='center')
table.auto_set_font_size(False)
table.set_fontsize(9)
table.scale(1.2, 1.5)
plt.tight_layout()
plt.show()
print("\n窗函数特性分析:")
for name, cg, pg, sl in window_params:
print(f"{name:12s}: 相干增益={cg:.3f}, 处理增益={pg:.3f}, 扇贝损失={sl:.2f}dB")2. 窗函数在频谱分析中的应用
python
# 创建测试信号:两个接近的正弦波
fs = 1000
t = np.linspace(0, 2, 2000, endpoint=False)
f1, f2 = 50, 52 # 两个接近的频率
signal_test = np.sin(2*np.pi*f1*t) + 0.8*np.sin(2*np.pi*f2*t)
# 使用不同窗函数进行频谱分析
window_types = ['boxcar', 'hann', 'blackman', 'kaiser']
window_params = [None, None, None, 8.6] # kaiser窗的beta参数
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
axes = axes.flatten()
for i, (window_type, param) in enumerate(zip(window_types, window_params)):
# 选择信号段进行分析
segment_length = 512
start_idx = 500
signal_segment = signal_test[start_idx:start_idx+segment_length]
# 应用窗函数
if window_type == 'kaiser':
window = signal.get_window((window_type, param), segment_length)
else:
window = signal.get_window(window_type, segment_length)
windowed_signal = signal_segment * window
# 计算频谱
spectrum = fft(windowed_signal, 2048)
freqs = fftfreq(2048, 1/fs)[:1024]
magnitude_db = 20 * np.log10(np.abs(spectrum[:1024]) / np.max(np.abs(spectrum)))
# 绘制频谱
axes[i].plot(freqs, magnitude_db, 'b-', linewidth=1.5)
axes[i].axvline(x=f1, color='r', linestyle='--', alpha=0.7, label=f'{f1} Hz')
axes[i].axvline(x=f2, color='g', linestyle='--', alpha=0.7, label=f'{f2} Hz')
axes[i].set_title(f'{window_type.capitalize()} 窗函数')
axes[i].set_xlabel('频率 (Hz)')
axes[i].set_ylabel('幅度 (dB)')
axes[i].set_xlim(40, 60)
axes[i].set_ylim(-80, 5)
axes[i].legend()
axes[i].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 频率分辨率分析
print("\n窗函数对频率分辨率的影响:")
for window_type, param in zip(window_types, window_params):
if window_type == 'kaiser':
window = signal.get_window((window_type, param), segment_length)
else:
window = signal.get_window(window_type, segment_length)
# 计算等效噪声带宽
enbw = fs * np.sum(window**2) / (np.sum(window)**2)
print(f"{window_type.capitalize():12s}: ENBW = {enbw:.3f} Hz")卷积和相关
1. 卷积运算
python
# 创建测试信号
t1 = np.linspace(0, 2, 200, endpoint=False)
signal1 = np.exp(-t1) * np.sin(2*np.pi*5*t1) # 衰减正弦波
t2 = np.linspace(0, 1, 100, endpoint=False)
signal2 = signal.gaussian(100, std=10) # 高斯脉冲
# 不同类型的卷积
conv_full = signal.convolve(signal1, signal2, mode='full')
conv_same = signal.convolve(signal1, signal2, mode='same')
conv_valid = signal.convolve(signal1, signal2, mode='valid')
# 可视化卷积结果
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 原始信号
axes[0, 0].plot(t1, signal1, 'b-', linewidth=2, label='信号1')
axes[0, 0].plot(t2, signal2, 'r-', linewidth=2, label='信号2')
axes[0, 0].set_title('原始信号')
axes[0, 0].set_xlabel('时间')
axes[0, 0].set_ylabel('幅度')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# Full卷积
axes[0, 1].plot(conv_full, 'g-', linewidth=2)
axes[0, 1].set_title(f'Full卷积 (长度: {len(conv_full)})')
axes[0, 1].set_xlabel('样本索引')
axes[0, 1].set_ylabel('幅度')
axes[0, 1].grid(True, alpha=0.3)
# Same卷积
axes[1, 0].plot(conv_same, 'm-', linewidth=2)
axes[1, 0].set_title(f'Same卷积 (长度: {len(conv_same)})')
axes[1, 0].set_xlabel('样本索引')
axes[1, 0].set_ylabel('幅度')
axes[1, 0].grid(True, alpha=0.3)
# Valid卷积
axes[1, 1].plot(conv_valid, 'c-', linewidth=2)
axes[1, 1].set_title(f'Valid卷积 (长度: {len(conv_valid)})')
axes[1, 1].set_xlabel('样本索引')
axes[1, 1].set_ylabel('幅度')
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("卷积结果长度:")
print(f"信号1长度: {len(signal1)}")
print(f"信号2长度: {len(signal2)}")
print(f"Full卷积: {len(conv_full)} = {len(signal1)} + {len(signal2)} - 1")
print(f"Same卷积: {len(conv_same)} = max({len(signal1)}, {len(signal2)})")
print(f"Valid卷积: {len(conv_valid)} = |{len(signal1)} - {len(signal2)}| + 1")2. 相关分析
python
# 创建相关分析的测试信号
np.random.seed(42)
t = np.linspace(0, 10, 1000)
# 原始信号
original_signal = np.sin(2*np.pi*2*t) + 0.5*np.sin(2*np.pi*5*t)
# 延迟信号
delay_samples = 50
delayed_signal = np.zeros_like(original_signal)
delayed_signal[delay_samples:] = original_signal[:-delay_samples]
# 添加噪声
noisy_signal = original_signal + 0.3*np.random.normal(0, 1, len(t))
# 计算不同类型的相关
# 1. 自相关
autocorr = signal.correlate(original_signal, original_signal, mode='full')
autocorr_lags = signal.correlation_lags(len(original_signal), len(original_signal), mode='full')
# 2. 互相关
crosscorr = signal.correlate(original_signal, delayed_signal, mode='full')
crosscorr_lags = signal.correlation_lags(len(original_signal), len(delayed_signal), mode='full')
# 3. 噪声信号的相关
noise_corr = signal.correlate(original_signal, noisy_signal, mode='full')
noise_corr_lags = signal.correlation_lags(len(original_signal), len(noisy_signal), mode='full')
# 可视化相关分析结果
fig, axes = plt.subplots(3, 2, figsize=(15, 12))
# 原始信号
axes[0, 0].plot(t[:200], original_signal[:200], 'b-', linewidth=2, label='原始信号')
axes[0, 0].plot(t[:200], delayed_signal[:200], 'r--', linewidth=2, label='延迟信号')
axes[0, 0].set_title('信号比较')
axes[0, 0].set_xlabel('时间')
axes[0, 0].set_ylabel('幅度')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 自相关
axes[0, 1].plot(autocorr_lags, autocorr, 'g-', linewidth=2)
axes[0, 1].set_title('自相关函数')
axes[0, 1].set_xlabel('滞后 (样本)')
axes[0, 1].set_ylabel('相关值')
axes[0, 1].set_xlim(-200, 200)
axes[0, 1].grid(True, alpha=0.3)
# 互相关
axes[1, 0].plot(crosscorr_lags, crosscorr, 'm-', linewidth=2)
max_corr_idx = np.argmax(crosscorr)
max_lag = crosscorr_lags[max_corr_idx]
axes[1, 0].axvline(x=max_lag, color='r', linestyle='--',
label=f'最大相关滞后: {max_lag}')
axes[1, 0].set_title('互相关函数')
axes[1, 0].set_xlabel('滞后 (样本)')
axes[1, 0].set_ylabel('相关值')
axes[1, 0].set_xlim(-200, 200)
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
# 含噪信号相关
axes[1, 1].plot(t[:200], original_signal[:200], 'b-', linewidth=2, label='原始信号')
axes[1, 1].plot(t[:200], noisy_signal[:200], 'r-', alpha=0.7, linewidth=1, label='含噪信号')
axes[1, 1].set_title('含噪信号比较')
axes[1, 1].set_xlabel('时间')
axes[1, 1].set_ylabel('幅度')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
# 含噪信号相关函数
axes[2, 0].plot(noise_corr_lags, noise_corr, 'c-', linewidth=2)
axes[2, 0].set_title('含噪信号相关函数')
axes[2, 0].set_xlabel('滞后 (样本)')
axes[2, 0].set_ylabel('相关值')
axes[2, 0].set_xlim(-200, 200)
axes[2, 0].grid(True, alpha=0.3)
# 相关系数计算
corr_coeff = np.corrcoef(original_signal, noisy_signal)[0, 1]
axes[2, 1].scatter(original_signal[::10], noisy_signal[::10], alpha=0.6)
axes[2, 1].set_title(f'散点图 (相关系数: {corr_coeff:.3f})')
axes[2, 1].set_xlabel('原始信号')
axes[2, 1].set_ylabel('含噪信号')
axes[2, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("\n相关分析结果:")
print(f"检测到的延迟: {max_lag} 样本 (实际延迟: {delay_samples} 样本)")
print(f"原始信号与含噪信号的相关系数: {corr_coeff:.3f}")
print(f"自相关最大值位置: {autocorr_lags[np.argmax(autocorr)]} (应该为0)")实际应用案例
1. 心电图(ECG)信号处理
python
# 模拟ECG信号
def generate_ecg_signal(duration=10, fs=1000, heart_rate=72):
"""生成模拟ECG信号"""
t = np.linspace(0, duration, int(fs * duration), endpoint=False)
# 基本心跳周期
beat_period = 60 / heart_rate # 秒
ecg = np.zeros_like(t)
for beat_time in np.arange(0, duration, beat_period):
# P波
p_center = beat_time + 0.1
p_width = 0.05
p_amplitude = 0.1
# QRS复合波
qrs_center = beat_time + 0.2
qrs_width = 0.04
qrs_amplitude = 1.0
# T波
t_center = beat_time + 0.4
t_width = 0.08
t_amplitude = 0.3
# 添加波形
for i, time_point in enumerate(t):
# P波
if abs(time_point - p_center) < p_width:
ecg[i] += p_amplitude * np.exp(-((time_point - p_center) / (p_width/3))**2)
# QRS波
if abs(time_point - qrs_center) < qrs_width:
ecg[i] += qrs_amplitude * np.exp(-((time_point - qrs_center) / (qrs_width/4))**2)
# T波
if abs(time_point - t_center) < t_width:
ecg[i] += t_amplitude * np.exp(-((time_point - t_center) / (t_width/3))**2)
return t, ecg
# 生成ECG信号
t_ecg, ecg_clean = generate_ecg_signal(duration=5, fs=1000, heart_rate=75)
# 添加噪声和干扰
np.random.seed(42)
# 基线漂移 (低频)
baseline_drift = 0.1 * np.sin(2*np.pi*0.5*t_ecg)
# 肌电干扰 (高频)
emg_noise = 0.05 * np.random.normal(0, 1, len(t_ecg))
# 工频干扰 (50Hz)
power_line_noise = 0.03 * np.sin(2*np.pi*50*t_ecg)
ecg_noisy = ecg_clean + baseline_drift + emg_noise + power_line_noise
# ECG信号滤波处理
fs_ecg = 1000
nyquist_ecg = fs_ecg / 2
# 1. 高通滤波器 - 去除基线漂移
hp_cutoff = 0.5 / nyquist_ecg
b_hp_ecg, a_hp_ecg = signal.butter(4, hp_cutoff, btype='high')
ecg_hp = signal.filtfilt(b_hp_ecg, a_hp_ecg, ecg_noisy)
# 2. 低通滤波器 - 去除高频噪声
lp_cutoff = 40 / nyquist_ecg
b_lp_ecg, a_lp_ecg = signal.butter(4, lp_cutoff, btype='low')
ecg_lp = signal.filtfilt(b_lp_ecg, a_lp_ecg, ecg_hp)
# 3. 陷波滤波器 - 去除工频干扰
notch_freq = 50 / nyquist_ecg
Q = 30 # 品质因子
b_notch, a_notch = signal.iirnotch(notch_freq, Q)
ecg_filtered = signal.filtfilt(b_notch, a_notch, ecg_lp)
# R波检测
from scipy.signal import find_peaks
# 使用峰值检测算法
peaks, properties = find_peaks(ecg_filtered,
height=0.5, # 最小高度
distance=int(0.6*fs_ecg), # 最小间距(对应100bpm)
prominence=0.3) # 显著性
# 计算心率
rr_intervals = np.diff(peaks) / fs_ecg # R-R间期(秒)
heart_rates = 60 / rr_intervals # 瞬时心率(bpm)
avg_heart_rate = np.mean(heart_rates)
# 可视化ECG处理结果
fig, axes = plt.subplots(4, 1, figsize=(15, 12))
# 原始和含噪ECG
axes[0].plot(t_ecg, ecg_clean, 'g-', linewidth=2, label='原始ECG')
axes[0].plot(t_ecg, ecg_noisy, 'r-', alpha=0.7, linewidth=1, label='含噪ECG')
axes[0].set_title('ECG信号对比')
axes[0].set_ylabel('幅度 (mV)')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
axes[0].set_xlim(0, 3)
# 滤波过程
axes[1].plot(t_ecg, ecg_noisy, 'r-', alpha=0.5, linewidth=1, label='含噪ECG')
axes[1].plot(t_ecg, ecg_hp, 'b-', alpha=0.7, linewidth=1, label='高通滤波')
axes[1].plot(t_ecg, ecg_lp, 'm-', alpha=0.7, linewidth=1, label='低通滤波')
axes[1].plot(t_ecg, ecg_filtered, 'g-', linewidth=2, label='最终滤波')
axes[1].set_title('ECG滤波处理过程')
axes[1].set_ylabel('幅度 (mV)')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
axes[1].set_xlim(0, 3)
# R波检测结果
axes[2].plot(t_ecg, ecg_filtered, 'b-', linewidth=2, label='滤波后ECG')
axes[2].plot(t_ecg[peaks], ecg_filtered[peaks], 'ro', markersize=8, label='检测到的R波')
axes[2].set_title(f'R波检测 (检测到 {len(peaks)} 个R波)')
axes[2].set_ylabel('幅度 (mV)')
axes[2].legend()
axes[2].grid(True, alpha=0.3)
axes[2].set_xlim(0, 5)
# 心率变化
if len(heart_rates) > 0:
axes[3].plot(t_ecg[peaks[1:]], heart_rates, 'ro-', linewidth=2, markersize=6)
axes[3].axhline(y=avg_heart_rate, color='g', linestyle='--',
label=f'平均心率: {avg_heart_rate:.1f} bpm')
axes[3].set_title('瞬时心率变化')
axes[3].set_xlabel('时间 (s)')
axes[3].set_ylabel('心率 (bpm)')
axes[3].legend()
axes[3].grid(True, alpha=0.3)
axes[3].set_ylim(60, 90)
plt.tight_layout()
plt.show()
print("\nECG信号分析结果:")
print(f"检测到的R波数量: {len(peaks)}")
print(f"平均心率: {avg_heart_rate:.1f} bpm")
print(f"心率变异性 (RMSSD): {np.sqrt(np.mean(np.diff(rr_intervals)**2))*1000:.1f} ms")
print(f"R-R间期范围: {np.min(rr_intervals):.3f} - {np.max(rr_intervals):.3f} s")2. 音频信号处理
python
# 生成音频信号示例
def generate_audio_signal(duration=3, fs=44100):
"""生成模拟音频信号"""
t = np.linspace(0, duration, int(fs * duration), endpoint=False)
# 基频和谐波
fundamental = 440 # A4音符
audio = (np.sin(2*np.pi*fundamental*t) + # 基频
0.5*np.sin(2*np.pi*2*fundamental*t) + # 二次谐波
0.3*np.sin(2*np.pi*3*fundamental*t) + # 三次谐波
0.2*np.sin(2*np.pi*4*fundamental*t)) # 四次谐波
# 添加包络
envelope = np.exp(-t/2) # 指数衰减
audio *= envelope
return t, audio
# 生成音频信号
t_audio, audio_clean = generate_audio_signal(duration=2, fs=8000)
fs_audio = 8000
# 添加噪声和失真
np.random.seed(42)
audio_noise = 0.1 * np.random.normal(0, 1, len(t_audio))
audio_noisy = audio_clean + audio_noise
# 音频信号处理
# 1. 降噪滤波
nyquist_audio = fs_audio / 2
lp_cutoff_audio = 4000 / nyquist_audio # 低通滤波
b_lp_audio, a_lp_audio = signal.butter(6, lp_cutoff_audio, btype='low')
audio_filtered = signal.filtfilt(b_lp_audio, a_lp_audio, audio_noisy)
# 2. 频谱分析
f_audio, t_audio_stft, Zxx_audio = signal.stft(audio_filtered, fs=fs_audio,
window='hann', nperseg=512, noverlap=256)
# 3. 音频特征提取
# 计算频谱质心
spectral_centroids = []
for i in range(Zxx_audio.shape[1]):
magnitude = np.abs(Zxx_audio[:, i])
if np.sum(magnitude) > 0:
centroid = np.sum(f_audio * magnitude) / np.sum(magnitude)
spectral_centroids.append(centroid)
else:
spectral_centroids.append(0)
# 计算过零率
def zero_crossing_rate(signal_data, frame_length=512, hop_length=256):
"""计算过零率"""
zcr = []
for i in range(0, len(signal_data) - frame_length, hop_length):
frame = signal_data[i:i+frame_length]
crossings = np.sum(np.abs(np.diff(np.sign(frame)))) / 2
zcr.append(crossings / frame_length)
return np.array(zcr)
zcr = zero_crossing_rate(audio_filtered)
# 可视化音频处理结果
fig, axes = plt.subplots(4, 1, figsize=(15, 12))
# 原始和处理后的音频
axes[0].plot(t_audio[:8000], audio_clean[:8000], 'g-', linewidth=1, label='原始音频')
axes[0].plot(t_audio[:8000], audio_noisy[:8000], 'r-', alpha=0.7, linewidth=1, label='含噪音频')
axes[0].plot(t_audio[:8000], audio_filtered[:8000], 'b-', linewidth=1.5, label='滤波后')
axes[0].set_title('音频信号处理')
axes[0].set_ylabel('幅度')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 频谱图
im = axes[1].pcolormesh(t_audio_stft, f_audio, 20*np.log10(np.abs(Zxx_audio)),
shading='gouraud', cmap='viridis')
axes[1].set_title('音频频谱图')
axes[1].set_ylabel('频率 (Hz)')
axes[1].set_ylim(0, 2000)
plt.colorbar(im, ax=axes[1], label='幅度 (dB)')
# 频谱质心
axes[2].plot(t_audio_stft, spectral_centroids, 'r-', linewidth=2)
axes[2].set_title('频谱质心变化')
axes[2].set_ylabel('频率 (Hz)')
axes[2].grid(True, alpha=0.3)
# 过零率
zcr_time = np.arange(len(zcr)) * 256 / fs_audio
axes[3].plot(zcr_time, zcr, 'b-', linewidth=2)
axes[3].set_title('过零率')
axes[3].set_xlabel('时间 (s)')
axes[3].set_ylabel('过零率')
axes[3].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("\n音频信号分析结果:")
print(f"平均频谱质心: {np.mean(spectral_centroids):.1f} Hz")
print(f"平均过零率: {np.mean(zcr):.4f}")
print(f"信号持续时间: {len(t_audio)/fs_audio:.2f} 秒")
print(f"采样频率: {fs_audio} Hz")性能优化技巧
1. 算法选择和参数优化
python
import time
# 比较不同卷积算法的性能
def benchmark_convolution():
"""比较不同卷积算法的性能"""
# 创建测试信号
signal_length = 10000
kernel_length = 100
test_signal = np.random.randn(signal_length)
test_kernel = np.random.randn(kernel_length)
methods = {
'direct': lambda: signal.convolve(test_signal, test_kernel, method='direct'),
'fft': lambda: signal.convolve(test_signal, test_kernel, method='fft'),
'auto': lambda: signal.convolve(test_signal, test_kernel, method='auto')
}
results = {}
for name, method in methods.items():
# 预热
method()
# 计时
start_time = time.time()
for _ in range(10):
result = method()
end_time = time.time()
results[name] = (end_time - start_time) / 10
return results
# 运行性能测试
conv_times = benchmark_convolution()
print("卷积算法性能比较:")
for method, time_taken in conv_times.items():
print(f"{method:8s}: {time_taken*1000:.2f} ms")
# 最快的方法
fastest_method = min(conv_times, key=conv_times.get)
print(f"\n最快方法: {fastest_method}")2. 内存优化
python
# 大信号处理的内存优化策略
def process_large_signal_chunked(signal_data, chunk_size=1024, overlap=128):
"""分块处理大信号以节省内存"""
processed_chunks = []
for i in range(0, len(signal_data) - overlap, chunk_size - overlap):
# 提取块
chunk = signal_data[i:i + chunk_size]
# 处理块(这里以低通滤波为例)
b, a = signal.butter(4, 0.1)
processed_chunk = signal.filtfilt(b, a, chunk)
# 去除重叠部分(除了第一块)
if i == 0:
processed_chunks.append(processed_chunk)
else:
processed_chunks.append(processed_chunk[overlap//2:])
return np.concatenate(processed_chunks)
# 示例:处理大信号
large_signal = np.random.randn(50000)
processed_signal = process_large_signal_chunked(large_signal)
print(f"\n大信号处理:")
print(f"原始信号长度: {len(large_signal)}")
print(f"处理后信号长度: {len(processed_signal)}")
print(f"内存使用优化: 分块处理")小结
本章详细介绍了 SciPy 的信号处理功能,主要内容包括:
核心概念
- 信号生成: 掌握了各种基本波形的生成方法
- 频谱分析: 学习了FFT、STFT等频域分析技术
- 数字滤波: 理解了IIR和FIR滤波器的设计与应用
- 窗函数: 了解了不同窗函数对频谱分析的影响
- 卷积相关: 掌握了信号的卷积和相关运算
实际应用
- 生物医学信号: ECG信号的滤波和特征提取
- 音频处理: 音频信号的降噪和特征分析
- 性能优化: 大信号处理的内存和计算优化
最佳实践
- 选择合适的滤波器类型:根据应用需求选择IIR或FIR
- 窗函数的选择:平衡频率分辨率和泄漏抑制
- 参数调优:根据信号特性调整滤波器参数
- 性能考虑:对于大数据量采用分块处理策略
练习题
基础练习
信号生成练习
- 生成一个包含多个频率成分的复合信号
- 添加不同类型的噪声
- 使用FFT分析其频谱特性
滤波器设计练习
- 设计一个带通滤波器去除特定频率的干扰
- 比较Butterworth、Chebyshev和Elliptic滤波器的性能
- 分析滤波器的幅度和相位响应
窗函数应用练习
- 使用不同窗函数分析同一信号的频谱
- 比较各窗函数的频率分辨率和泄漏特性
- 选择最适合特定应用的窗函数
进阶练习
实际信号处理项目
- 处理一段真实的音频信号(如语音或音乐)
- 实现噪声抑制算法
- 提取音频的特征参数
自适应滤波练习
- 实现一个简单的自适应滤波器
- 用于消除未知特性的干扰信号
- 分析收敛性能和稳态误差
通过这些练习,你将能够熟练运用SciPy进行各种信号处理任务,为后续的高级应用打下坚实基础。