Skip to content

信号处理

SciPy 的 scipy.signal 模块提供了丰富的信号处理功能,包括滤波器设计、频谱分析、信号生成、卷积运算等。无论是处理音频信号、生物医学信号,还是进行通信系统分析,这个模块都提供了强大的工具。本章将详细介绍如何使用 SciPy 进行各种信号处理任务。

scipy.signal 模块概述

scipy.signal 模块包含以下主要功能:

  • 信号生成和波形创建
  • 数字滤波器设计和应用
  • 频谱分析和变换
  • 卷积和相关运算
  • 窗函数和谱估计
  • 系统分析和控制理论
python
import numpy as np
from scipy import signal
import matplotlib.pyplot as plt
from scipy.fft import fft, fftfreq
import warnings
warnings.filterwarnings('ignore')

# 设置绘图样式
plt.style.use('seaborn-v0_8')
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

# 查看 signal 模块的主要功能
print("scipy.signal 主要功能:")
functions = [attr for attr in dir(signal) if not attr.startswith('_')]
print(f"总共 {len(functions)} 个函数和类")
print("部分函数:", functions[:20])

信号生成

1. 基本波形生成

python
# 时间轴设置
fs = 1000  # 采样频率 (Hz)
T = 2.0    # 信号持续时间 (秒)
t = np.linspace(0, T, int(fs * T), endpoint=False)

# 生成各种基本波形
freq1, freq2, freq3 = 10, 25, 50  # 频率 (Hz)

# 正弦波
sine_wave = np.sin(2 * np.pi * freq1 * t)

# 方波
square_wave = signal.square(2 * np.pi * freq2 * t)

# 锯齿波
sawtooth_wave = signal.sawtooth(2 * np.pi * freq3 * t)

# 三角波
triangle_wave = signal.sawtooth(2 * np.pi * freq2 * t, 0.5)

# 啁啾信号(频率扫描)
chirp_signal = signal.chirp(t, f0=1, f1=100, t1=T, method='linear')

# 高斯脉冲
gaussian_pulse = signal.gausspulse(t - T/2, fc=freq2)

# 可视化基本波形
fig, axes = plt.subplots(3, 2, figsize=(15, 12))
axes = axes.flatten()

waveforms = [
    (sine_wave, f'正弦波 ({freq1} Hz)'),
    (square_wave, f'方波 ({freq2} Hz)'),
    (sawtooth_wave, f'锯齿波 ({freq3} Hz)'),
    (triangle_wave, f'三角波 ({freq2} Hz)'),
    (chirp_signal, '啁啾信号 (1-100 Hz)'),
    (gaussian_pulse, f'高斯脉冲 ({freq2} Hz)')
]

for i, (wave, title) in enumerate(waveforms):
    axes[i].plot(t[:500], wave[:500], 'b-', linewidth=1.5)  # 只显示前0.5秒
    axes[i].set_title(title)
    axes[i].set_xlabel('时间 (s)')
    axes[i].set_ylabel('幅度')
    axes[i].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("基本波形生成完成")
print(f"采样频率: {fs} Hz")
print(f"信号长度: {len(t)} 个采样点")
print(f"时间分辨率: {1/fs:.3f} s")

2. 复合信号和噪声

python
# 创建复合信号
np.random.seed(42)

# 多频率正弦波叠加
composite_signal = (np.sin(2 * np.pi * 5 * t) + 
                   0.5 * np.sin(2 * np.pi * 15 * t) + 
                   0.3 * np.sin(2 * np.pi * 30 * t))

# 添加噪声
noise_level = 0.2
noise = np.random.normal(0, noise_level, len(t))
noisy_signal = composite_signal + noise

# AM调制信号
carrier_freq = 100  # 载波频率
modulation_freq = 5  # 调制频率
modulation_depth = 0.5
am_signal = (1 + modulation_depth * np.sin(2 * np.pi * modulation_freq * t)) * \
           np.sin(2 * np.pi * carrier_freq * t)

# FM调制信号
frequency_deviation = 20  # 频率偏移
fm_signal = np.sin(2 * np.pi * carrier_freq * t + 
                  frequency_deviation * np.sin(2 * np.pi * modulation_freq * t))

# 可视化复合信号
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# 复合信号
axes[0, 0].plot(t[:1000], composite_signal[:1000], 'b-', linewidth=1, label='原始信号')
axes[0, 0].plot(t[:1000], noisy_signal[:1000], 'r-', alpha=0.7, linewidth=1, label='含噪信号')
axes[0, 0].set_title('复合信号 (5Hz + 15Hz + 30Hz)')
axes[0, 0].set_xlabel('时间 (s)')
axes[0, 0].set_ylabel('幅度')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

# AM调制
axes[0, 1].plot(t[:2000], am_signal[:2000], 'g-', linewidth=1)
axes[0, 1].set_title(f'AM调制信号 (载波: {carrier_freq}Hz, 调制: {modulation_freq}Hz)')
axes[0, 1].set_xlabel('时间 (s)')
axes[0, 1].set_ylabel('幅度')
axes[0, 1].grid(True, alpha=0.3)

# FM调制
axes[1, 0].plot(t[:2000], fm_signal[:2000], 'm-', linewidth=1)
axes[1, 0].set_title(f'FM调制信号 (载波: {carrier_freq}Hz, 调制: {modulation_freq}Hz)')
axes[1, 0].set_xlabel('时间 (s)')
axes[1, 0].set_ylabel('幅度')
axes[1, 0].grid(True, alpha=0.3)

# 噪声特性
axes[1, 1].hist(noise, bins=50, alpha=0.7, density=True, edgecolor='black')
axes[1, 1].set_title(f'噪声分布 (σ = {noise_level})')
axes[1, 1].set_xlabel('幅度')
axes[1, 1].set_ylabel('概率密度')
axes[1, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

频谱分析

1. 傅里叶变换

python
# 计算频谱
def compute_spectrum(signal_data, sampling_rate):
    """计算信号的频谱"""
    N = len(signal_data)
    frequencies = fftfreq(N, 1/sampling_rate)[:N//2]
    spectrum = fft(signal_data)[:N//2]
    magnitude = np.abs(spectrum)
    phase = np.angle(spectrum)
    power = magnitude**2
    
    return frequencies, magnitude, phase, power

# 分析不同信号的频谱
signals_to_analyze = [
    (composite_signal, '复合信号'),
    (noisy_signal, '含噪信号'),
    (am_signal, 'AM调制信号'),
    (fm_signal, 'FM调制信号')
]

fig, axes = plt.subplots(4, 2, figsize=(15, 16))

for i, (sig, name) in enumerate(signals_to_analyze):
    freqs, mag, phase, power = compute_spectrum(sig, fs)
    
    # 时域信号
    axes[i, 0].plot(t[:1000], sig[:1000], 'b-', linewidth=1)
    axes[i, 0].set_title(f'{name} - 时域')
    axes[i, 0].set_xlabel('时间 (s)')
    axes[i, 0].set_ylabel('幅度')
    axes[i, 0].grid(True, alpha=0.3)
    
    # 频域信号
    axes[i, 1].plot(freqs, mag, 'r-', linewidth=1)
    axes[i, 1].set_title(f'{name} - 频域')
    axes[i, 1].set_xlabel('频率 (Hz)')
    axes[i, 1].set_ylabel('幅度')
    axes[i, 1].set_xlim(0, 150)  # 只显示0-150Hz
    axes[i, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# 频谱特征分析
print("\n频谱分析结果:")
for sig, name in signals_to_analyze:
    freqs, mag, _, power = compute_spectrum(sig, fs)
    
    # 找到主要频率成分
    peak_indices = signal.find_peaks(mag, height=np.max(mag)*0.1)[0]
    peak_freqs = freqs[peak_indices]
    peak_mags = mag[peak_indices]
    
    print(f"\n{name}:")
    print(f"  主要频率成分: {peak_freqs[:5]} Hz")
    print(f"  对应幅度: {peak_mags[:5]}")
    print(f"  总功率: {np.sum(power):.2f}")

2. 短时傅里叶变换 (STFT)

python
# 创建时变信号
t_stft = np.linspace(0, 4, 4000)
time_varying_signal = np.zeros_like(t_stft)

# 分段频率
for i, time_point in enumerate(t_stft):
    if time_point < 1:
        freq = 10
    elif time_point < 2:
        freq = 25
    elif time_point < 3:
        freq = 50
    else:
        freq = 75
    
    time_varying_signal[i] = np.sin(2 * np.pi * freq * time_point)

# 添加噪声
time_varying_signal += 0.1 * np.random.normal(0, 1, len(t_stft))

# 计算短时傅里叶变换
f_stft, t_stft_axis, Zxx = signal.stft(time_varying_signal, fs=1000, 
                                      window='hann', nperseg=256, noverlap=128)

# 可视化STFT结果
fig, axes = plt.subplots(3, 1, figsize=(15, 12))

# 原始信号
axes[0].plot(t_stft, time_varying_signal, 'b-', linewidth=1)
axes[0].set_title('时变信号 (频率: 10→25→50→75 Hz)')
axes[0].set_xlabel('时间 (s)')
axes[0].set_ylabel('幅度')
axes[0].grid(True, alpha=0.3)

# STFT幅度谱
im1 = axes[1].pcolormesh(t_stft_axis, f_stft, np.abs(Zxx), shading='gouraud', cmap='viridis')
axes[1].set_title('短时傅里叶变换 - 幅度谱')
axes[1].set_xlabel('时间 (s)')
axes[1].set_ylabel('频率 (Hz)')
axes[1].set_ylim(0, 100)
plt.colorbar(im1, ax=axes[1], label='幅度')

# STFT相位谱
im2 = axes[2].pcolormesh(t_stft_axis, f_stft, np.angle(Zxx), shading='gouraud', cmap='hsv')
axes[2].set_title('短时傅里叶变换 - 相位谱')
axes[2].set_xlabel('时间 (s)')
axes[2].set_ylabel('频率 (Hz)')
axes[2].set_ylim(0, 100)
plt.colorbar(im2, ax=axes[2], label='相位 (弧度)')

plt.tight_layout()
plt.show()

print("\n短时傅里叶变换分析:")
print(f"时间分辨率: {t_stft_axis[1] - t_stft_axis[0]:.3f} s")
print(f"频率分辨率: {f_stft[1] - f_stft[0]:.3f} Hz")
print(f"窗长度: 256 个采样点")
print(f"重叠: 128 个采样点 (50%)")

数字滤波器

1. IIR滤波器设计

python
# 创建测试信号
fs = 1000
t = np.linspace(0, 2, 2000, endpoint=False)

# 多频率信号
signal_clean = (np.sin(2*np.pi*5*t) +      # 5 Hz - 保留
               0.5*np.sin(2*np.pi*25*t) +   # 25 Hz - 保留  
               0.3*np.sin(2*np.pi*60*t) +   # 60 Hz - 滤除
               0.2*np.sin(2*np.pi*120*t))   # 120 Hz - 滤除

# 添加噪声
noise = 0.1 * np.random.normal(0, 1, len(t))
signal_noisy = signal_clean + noise

# 设计不同类型的IIR滤波器
nyquist = fs / 2

# 1. 低通滤波器 (截止频率: 30 Hz)
lowpass_cutoff = 30 / nyquist
b_lp, a_lp = signal.butter(4, lowpass_cutoff, btype='low')

# 2. 高通滤波器 (截止频率: 10 Hz)
highpass_cutoff = 10 / nyquist
b_hp, a_hp = signal.butter(4, highpass_cutoff, btype='high')

# 3. 带通滤波器 (通带: 20-40 Hz)
bandpass_low = 20 / nyquist
bandpass_high = 40 / nyquist
b_bp, a_bp = signal.butter(4, [bandpass_low, bandpass_high], btype='band')

# 4. 带阻滤波器 (阻带: 55-65 Hz)
bandstop_low = 55 / nyquist
bandstop_high = 65 / nyquist
b_bs, a_bs = signal.butter(4, [bandstop_low, bandstop_high], btype='bandstop')

# 应用滤波器
filtered_lp = signal.filtfilt(b_lp, a_lp, signal_noisy)
filtered_hp = signal.filtfilt(b_hp, a_hp, signal_noisy)
filtered_bp = signal.filtfilt(b_bp, a_bp, signal_noisy)
filtered_bs = signal.filtfilt(b_bs, a_bs, signal_noisy)

# 可视化滤波结果
fig, axes = plt.subplots(3, 2, figsize=(15, 12))

# 原始信号和含噪信号
axes[0, 0].plot(t[:1000], signal_clean[:1000], 'g-', linewidth=2, label='原始信号')
axes[0, 0].plot(t[:1000], signal_noisy[:1000], 'r-', alpha=0.7, linewidth=1, label='含噪信号')
axes[0, 0].set_title('原始信号 vs 含噪信号')
axes[0, 0].set_xlabel('时间 (s)')
axes[0, 0].set_ylabel('幅度')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

# 低通滤波
axes[0, 1].plot(t[:1000], signal_noisy[:1000], 'r-', alpha=0.5, linewidth=1, label='含噪信号')
axes[0, 1].plot(t[:1000], filtered_lp[:1000], 'b-', linewidth=2, label='低通滤波')
axes[0, 1].set_title(f'低通滤波器 (截止: {30} Hz)')
axes[0, 1].set_xlabel('时间 (s)')
axes[0, 1].set_ylabel('幅度')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)

# 高通滤波
axes[1, 0].plot(t[:1000], signal_noisy[:1000], 'r-', alpha=0.5, linewidth=1, label='含噪信号')
axes[1, 0].plot(t[:1000], filtered_hp[:1000], 'g-', linewidth=2, label='高通滤波')
axes[1, 0].set_title(f'高通滤波器 (截止: {10} Hz)')
axes[1, 0].set_xlabel('时间 (s)')
axes[1, 0].set_ylabel('幅度')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)

# 带通滤波
axes[1, 1].plot(t[:1000], signal_noisy[:1000], 'r-', alpha=0.5, linewidth=1, label='含噪信号')
axes[1, 1].plot(t[:1000], filtered_bp[:1000], 'm-', linewidth=2, label='带通滤波')
axes[1, 1].set_title(f'带通滤波器 ({20}-{40} Hz)')
axes[1, 1].set_xlabel('时间 (s)')
axes[1, 1].set_ylabel('幅度')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)

# 带阻滤波
axes[2, 0].plot(t[:1000], signal_noisy[:1000], 'r-', alpha=0.5, linewidth=1, label='含噪信号')
axes[2, 0].plot(t[:1000], filtered_bs[:1000], 'c-', linewidth=2, label='带阻滤波')
axes[2, 0].set_title(f'带阻滤波器 ({55}-{65} Hz)')
axes[2, 0].set_xlabel('时间 (s)')
axes[2, 0].set_ylabel('幅度')
axes[2, 0].legend()
axes[2, 0].grid(True, alpha=0.3)

# 频谱比较
freqs = fftfreq(len(signal_noisy), 1/fs)[:len(signal_noisy)//2]
spectrum_original = np.abs(fft(signal_noisy))[:len(signal_noisy)//2]
spectrum_filtered = np.abs(fft(filtered_lp))[:len(signal_noisy)//2]

axes[2, 1].plot(freqs, spectrum_original, 'r-', alpha=0.7, linewidth=1, label='原始频谱')
axes[2, 1].plot(freqs, spectrum_filtered, 'b-', linewidth=2, label='滤波后频谱')
axes[2, 1].set_title('频谱比较 (低通滤波)')
axes[2, 1].set_xlabel('频率 (Hz)')
axes[2, 1].set_ylabel('幅度')
axes[2, 1].set_xlim(0, 150)
axes[2, 1].legend()
axes[2, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

2. FIR滤波器设计

python
# 设计FIR滤波器
from scipy.signal import firwin, freqz

# FIR滤波器参数
filter_length = 101  # 滤波器长度(奇数)

# 设计不同类型的FIR滤波器
# 1. 低通FIR滤波器
fir_lp = firwin(filter_length, 30, fs=fs, pass_zero='lowpass')

# 2. 高通FIR滤波器  
fir_hp = firwin(filter_length, 10, fs=fs, pass_zero='highpass')

# 3. 带通FIR滤波器
fir_bp = firwin(filter_length, [20, 40], fs=fs, pass_zero='bandpass')

# 4. 带阻FIR滤波器
fir_bs = firwin(filter_length, [55, 65], fs=fs, pass_zero='bandstop')

# 计算频率响应
filters = [
    (fir_lp, '低通FIR', 'blue'),
    (fir_hp, '高通FIR', 'green'), 
    (fir_bp, '带通FIR', 'magenta'),
    (fir_bs, '带阻FIR', 'cyan')
]

fig, axes = plt.subplots(2, 2, figsize=(15, 10))

for i, (fir_coeff, name, color) in enumerate(filters):
    # 计算频率响应
    w, h = freqz(fir_coeff, worN=8000, fs=fs)
    
    # 幅度响应
    ax1 = axes[i//2, i%2]
    ax1.plot(w, 20 * np.log10(np.abs(h)), color=color, linewidth=2)
    ax1.set_title(f'{name} - 幅度响应')
    ax1.set_xlabel('频率 (Hz)')
    ax1.set_ylabel('幅度 (dB)')
    ax1.grid(True, alpha=0.3)
    ax1.set_xlim(0, 200)
    ax1.set_ylim(-80, 5)

plt.tight_layout()
plt.show()

# 应用FIR滤波器并比较结果
filtered_fir_lp = signal.convolve(signal_noisy, fir_lp, mode='same')
filtered_iir_lp = signal.filtfilt(b_lp, a_lp, signal_noisy)

# 比较FIR和IIR滤波器
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# 时域比较
axes[0, 0].plot(t[:1000], signal_noisy[:1000], 'r-', alpha=0.5, linewidth=1, label='原始信号')
axes[0, 0].plot(t[:1000], filtered_fir_lp[:1000], 'b-', linewidth=2, label='FIR滤波')
axes[0, 0].plot(t[:1000], filtered_iir_lp[:1000], 'g--', linewidth=2, label='IIR滤波')
axes[0, 0].set_title('时域比较 - 低通滤波')
axes[0, 0].set_xlabel('时间 (s)')
axes[0, 0].set_ylabel('幅度')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

# 频域比较
freqs = fftfreq(len(signal_noisy), 1/fs)[:len(signal_noisy)//2]
spectrum_fir = np.abs(fft(filtered_fir_lp))[:len(signal_noisy)//2]
spectrum_iir = np.abs(fft(filtered_iir_lp))[:len(signal_noisy)//2]

axes[0, 1].plot(freqs, spectrum_fir, 'b-', linewidth=2, label='FIR滤波')
axes[0, 1].plot(freqs, spectrum_iir, 'g--', linewidth=2, label='IIR滤波')
axes[0, 1].set_title('频域比较 - 低通滤波')
axes[0, 1].set_xlabel('频率 (Hz)')
axes[0, 1].set_ylabel('幅度')
axes[0, 1].set_xlim(0, 100)
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)

# 滤波器系数
axes[1, 0].stem(range(len(fir_lp)), fir_lp, basefmt=" ")
axes[1, 0].set_title('FIR滤波器系数')
axes[1, 0].set_xlabel('系数索引')
axes[1, 0].set_ylabel('系数值')
axes[1, 0].grid(True, alpha=0.3)

# 相位响应比较
w_fir, h_fir = freqz(fir_lp, worN=8000, fs=fs)
w_iir, h_iir = freqz(b_lp, a_lp, worN=8000, fs=fs)

axes[1, 1].plot(w_fir, np.angle(h_fir), 'b-', linewidth=2, label='FIR相位')
axes[1, 1].plot(w_iir, np.angle(h_iir), 'g--', linewidth=2, label='IIR相位')
axes[1, 1].set_title('相位响应比较')
axes[1, 1].set_xlabel('频率 (Hz)')
axes[1, 1].set_ylabel('相位 (弧度)')
axes[1, 1].set_xlim(0, 100)
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("\nFIR vs IIR滤波器比较:")
print(f"FIR滤波器长度: {len(fir_lp)} 个系数")
print(f"IIR滤波器阶数: {len(b_lp)-1} (分子) + {len(a_lp)-1} (分母)")
print(f"FIR滤波器特点: 线性相位, 总是稳定")
print(f"IIR滤波器特点: 计算效率高, 可能不稳定")

窗函数

1. 常用窗函数

python
# 窗函数长度
N = 256

# 生成各种窗函数
windows = {
    '矩形窗': signal.boxcar(N),
    '汉宁窗': signal.hann(N),
    '汉明窗': signal.hamming(N),
    '布莱克曼窗': signal.blackman(N),
    '凯泽窗': signal.kaiser(N, beta=8.6),
    '巴特利特窗': signal.bartlett(N),
    '图基窗': signal.tukey(N, alpha=0.5)
}

# 可视化窗函数
fig, axes = plt.subplots(3, 3, figsize=(15, 12))
axes = axes.flatten()

for i, (name, window) in enumerate(windows.items()):
    if i < len(axes):
        axes[i].plot(window, 'b-', linewidth=2)
        axes[i].set_title(name)
        axes[i].set_xlabel('样本索引')
        axes[i].set_ylabel('幅度')
        axes[i].grid(True, alpha=0.3)
        axes[i].set_ylim(0, 1.1)

# 窗函数频谱比较
axes[-2].clear()
for name, window in list(windows.items())[:4]:  # 只显示前4个
    # 计算频谱
    spectrum = np.abs(fft(window, 2048))
    freqs = np.linspace(0, 1, len(spectrum)//2)
    spectrum_db = 20 * np.log10(spectrum[:len(spectrum)//2] / np.max(spectrum))
    
    axes[-2].plot(freqs, spectrum_db, linewidth=2, label=name)

axes[-2].set_title('窗函数频谱比较')
axes[-2].set_xlabel('归一化频率')
axes[-2].set_ylabel('幅度 (dB)')
axes[-2].set_ylim(-120, 10)
axes[-2].legend()
axes[-2].grid(True, alpha=0.3)

# 窗函数参数比较
axes[-1].clear()
window_params = []
for name, window in windows.items():
    # 计算窗函数参数
    coherent_gain = np.mean(window)
    processing_gain = np.sum(window**2) / len(window)
    scalloping_loss = -20 * np.log10(np.mean(window))
    
    window_params.append([name, coherent_gain, processing_gain, scalloping_loss])

# 显示参数表格
axes[-1].axis('tight')
axes[-1].axis('off')
table_data = [[p[0], f'{p[1]:.3f}', f'{p[2]:.3f}', f'{p[3]:.2f}'] for p in window_params]
headers = ['窗函数', '相干增益', '处理增益', '扇贝损失(dB)']
table = axes[-1].table(cellText=table_data, colLabels=headers, 
                      cellLoc='center', loc='center')
table.auto_set_font_size(False)
table.set_fontsize(9)
table.scale(1.2, 1.5)

plt.tight_layout()
plt.show()

print("\n窗函数特性分析:")
for name, cg, pg, sl in window_params:
    print(f"{name:12s}: 相干增益={cg:.3f}, 处理增益={pg:.3f}, 扇贝损失={sl:.2f}dB")

2. 窗函数在频谱分析中的应用

python
# 创建测试信号:两个接近的正弦波
fs = 1000
t = np.linspace(0, 2, 2000, endpoint=False)
f1, f2 = 50, 52  # 两个接近的频率
signal_test = np.sin(2*np.pi*f1*t) + 0.8*np.sin(2*np.pi*f2*t)

# 使用不同窗函数进行频谱分析
window_types = ['boxcar', 'hann', 'blackman', 'kaiser']
window_params = [None, None, None, 8.6]  # kaiser窗的beta参数

fig, axes = plt.subplots(2, 2, figsize=(15, 10))
axes = axes.flatten()

for i, (window_type, param) in enumerate(zip(window_types, window_params)):
    # 选择信号段进行分析
    segment_length = 512
    start_idx = 500
    signal_segment = signal_test[start_idx:start_idx+segment_length]
    
    # 应用窗函数
    if window_type == 'kaiser':
        window = signal.get_window((window_type, param), segment_length)
    else:
        window = signal.get_window(window_type, segment_length)
    
    windowed_signal = signal_segment * window
    
    # 计算频谱
    spectrum = fft(windowed_signal, 2048)
    freqs = fftfreq(2048, 1/fs)[:1024]
    magnitude_db = 20 * np.log10(np.abs(spectrum[:1024]) / np.max(np.abs(spectrum)))
    
    # 绘制频谱
    axes[i].plot(freqs, magnitude_db, 'b-', linewidth=1.5)
    axes[i].axvline(x=f1, color='r', linestyle='--', alpha=0.7, label=f'{f1} Hz')
    axes[i].axvline(x=f2, color='g', linestyle='--', alpha=0.7, label=f'{f2} Hz')
    axes[i].set_title(f'{window_type.capitalize()} 窗函数')
    axes[i].set_xlabel('频率 (Hz)')
    axes[i].set_ylabel('幅度 (dB)')
    axes[i].set_xlim(40, 60)
    axes[i].set_ylim(-80, 5)
    axes[i].legend()
    axes[i].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# 频率分辨率分析
print("\n窗函数对频率分辨率的影响:")
for window_type, param in zip(window_types, window_params):
    if window_type == 'kaiser':
        window = signal.get_window((window_type, param), segment_length)
    else:
        window = signal.get_window(window_type, segment_length)
    
    # 计算等效噪声带宽
    enbw = fs * np.sum(window**2) / (np.sum(window)**2)
    
    print(f"{window_type.capitalize():12s}: ENBW = {enbw:.3f} Hz")

卷积和相关

1. 卷积运算

python
# 创建测试信号
t1 = np.linspace(0, 2, 200, endpoint=False)
signal1 = np.exp(-t1) * np.sin(2*np.pi*5*t1)  # 衰减正弦波

t2 = np.linspace(0, 1, 100, endpoint=False) 
signal2 = signal.gaussian(100, std=10)  # 高斯脉冲

# 不同类型的卷积
conv_full = signal.convolve(signal1, signal2, mode='full')
conv_same = signal.convolve(signal1, signal2, mode='same')
conv_valid = signal.convolve(signal1, signal2, mode='valid')

# 可视化卷积结果
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# 原始信号
axes[0, 0].plot(t1, signal1, 'b-', linewidth=2, label='信号1')
axes[0, 0].plot(t2, signal2, 'r-', linewidth=2, label='信号2')
axes[0, 0].set_title('原始信号')
axes[0, 0].set_xlabel('时间')
axes[0, 0].set_ylabel('幅度')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

# Full卷积
axes[0, 1].plot(conv_full, 'g-', linewidth=2)
axes[0, 1].set_title(f'Full卷积 (长度: {len(conv_full)})')
axes[0, 1].set_xlabel('样本索引')
axes[0, 1].set_ylabel('幅度')
axes[0, 1].grid(True, alpha=0.3)

# Same卷积
axes[1, 0].plot(conv_same, 'm-', linewidth=2)
axes[1, 0].set_title(f'Same卷积 (长度: {len(conv_same)})')
axes[1, 0].set_xlabel('样本索引')
axes[1, 0].set_ylabel('幅度')
axes[1, 0].grid(True, alpha=0.3)

# Valid卷积
axes[1, 1].plot(conv_valid, 'c-', linewidth=2)
axes[1, 1].set_title(f'Valid卷积 (长度: {len(conv_valid)})')
axes[1, 1].set_xlabel('样本索引')
axes[1, 1].set_ylabel('幅度')
axes[1, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("卷积结果长度:")
print(f"信号1长度: {len(signal1)}")
print(f"信号2长度: {len(signal2)}")
print(f"Full卷积: {len(conv_full)} = {len(signal1)} + {len(signal2)} - 1")
print(f"Same卷积: {len(conv_same)} = max({len(signal1)}, {len(signal2)})")
print(f"Valid卷积: {len(conv_valid)} = |{len(signal1)} - {len(signal2)}| + 1")

2. 相关分析

python
# 创建相关分析的测试信号
np.random.seed(42)
t = np.linspace(0, 10, 1000)

# 原始信号
original_signal = np.sin(2*np.pi*2*t) + 0.5*np.sin(2*np.pi*5*t)

# 延迟信号
delay_samples = 50
delayed_signal = np.zeros_like(original_signal)
delayed_signal[delay_samples:] = original_signal[:-delay_samples]

# 添加噪声
noisy_signal = original_signal + 0.3*np.random.normal(0, 1, len(t))

# 计算不同类型的相关
# 1. 自相关
autocorr = signal.correlate(original_signal, original_signal, mode='full')
autocorr_lags = signal.correlation_lags(len(original_signal), len(original_signal), mode='full')

# 2. 互相关
crosscorr = signal.correlate(original_signal, delayed_signal, mode='full')
crosscorr_lags = signal.correlation_lags(len(original_signal), len(delayed_signal), mode='full')

# 3. 噪声信号的相关
noise_corr = signal.correlate(original_signal, noisy_signal, mode='full')
noise_corr_lags = signal.correlation_lags(len(original_signal), len(noisy_signal), mode='full')

# 可视化相关分析结果
fig, axes = plt.subplots(3, 2, figsize=(15, 12))

# 原始信号
axes[0, 0].plot(t[:200], original_signal[:200], 'b-', linewidth=2, label='原始信号')
axes[0, 0].plot(t[:200], delayed_signal[:200], 'r--', linewidth=2, label='延迟信号')
axes[0, 0].set_title('信号比较')
axes[0, 0].set_xlabel('时间')
axes[0, 0].set_ylabel('幅度')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

# 自相关
axes[0, 1].plot(autocorr_lags, autocorr, 'g-', linewidth=2)
axes[0, 1].set_title('自相关函数')
axes[0, 1].set_xlabel('滞后 (样本)')
axes[0, 1].set_ylabel('相关值')
axes[0, 1].set_xlim(-200, 200)
axes[0, 1].grid(True, alpha=0.3)

# 互相关
axes[1, 0].plot(crosscorr_lags, crosscorr, 'm-', linewidth=2)
max_corr_idx = np.argmax(crosscorr)
max_lag = crosscorr_lags[max_corr_idx]
axes[1, 0].axvline(x=max_lag, color='r', linestyle='--', 
                  label=f'最大相关滞后: {max_lag}')
axes[1, 0].set_title('互相关函数')
axes[1, 0].set_xlabel('滞后 (样本)')
axes[1, 0].set_ylabel('相关值')
axes[1, 0].set_xlim(-200, 200)
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)

# 含噪信号相关
axes[1, 1].plot(t[:200], original_signal[:200], 'b-', linewidth=2, label='原始信号')
axes[1, 1].plot(t[:200], noisy_signal[:200], 'r-', alpha=0.7, linewidth=1, label='含噪信号')
axes[1, 1].set_title('含噪信号比较')
axes[1, 1].set_xlabel('时间')
axes[1, 1].set_ylabel('幅度')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)

# 含噪信号相关函数
axes[2, 0].plot(noise_corr_lags, noise_corr, 'c-', linewidth=2)
axes[2, 0].set_title('含噪信号相关函数')
axes[2, 0].set_xlabel('滞后 (样本)')
axes[2, 0].set_ylabel('相关值')
axes[2, 0].set_xlim(-200, 200)
axes[2, 0].grid(True, alpha=0.3)

# 相关系数计算
corr_coeff = np.corrcoef(original_signal, noisy_signal)[0, 1]
axes[2, 1].scatter(original_signal[::10], noisy_signal[::10], alpha=0.6)
axes[2, 1].set_title(f'散点图 (相关系数: {corr_coeff:.3f})')
axes[2, 1].set_xlabel('原始信号')
axes[2, 1].set_ylabel('含噪信号')
axes[2, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("\n相关分析结果:")
print(f"检测到的延迟: {max_lag} 样本 (实际延迟: {delay_samples} 样本)")
print(f"原始信号与含噪信号的相关系数: {corr_coeff:.3f}")
print(f"自相关最大值位置: {autocorr_lags[np.argmax(autocorr)]} (应该为0)")

实际应用案例

1. 心电图(ECG)信号处理

python
# 模拟ECG信号
def generate_ecg_signal(duration=10, fs=1000, heart_rate=72):
    """生成模拟ECG信号"""
    t = np.linspace(0, duration, int(fs * duration), endpoint=False)
    
    # 基本心跳周期
    beat_period = 60 / heart_rate  # 秒
    
    ecg = np.zeros_like(t)
    
    for beat_time in np.arange(0, duration, beat_period):
        # P波
        p_center = beat_time + 0.1
        p_width = 0.05
        p_amplitude = 0.1
        
        # QRS复合波
        qrs_center = beat_time + 0.2
        qrs_width = 0.04
        qrs_amplitude = 1.0
        
        # T波
        t_center = beat_time + 0.4
        t_width = 0.08
        t_amplitude = 0.3
        
        # 添加波形
        for i, time_point in enumerate(t):
            # P波
            if abs(time_point - p_center) < p_width:
                ecg[i] += p_amplitude * np.exp(-((time_point - p_center) / (p_width/3))**2)
            
            # QRS波
            if abs(time_point - qrs_center) < qrs_width:
                ecg[i] += qrs_amplitude * np.exp(-((time_point - qrs_center) / (qrs_width/4))**2)
            
            # T波
            if abs(time_point - t_center) < t_width:
                ecg[i] += t_amplitude * np.exp(-((time_point - t_center) / (t_width/3))**2)
    
    return t, ecg

# 生成ECG信号
t_ecg, ecg_clean = generate_ecg_signal(duration=5, fs=1000, heart_rate=75)

# 添加噪声和干扰
np.random.seed(42)
# 基线漂移 (低频)
baseline_drift = 0.1 * np.sin(2*np.pi*0.5*t_ecg)
# 肌电干扰 (高频)
emg_noise = 0.05 * np.random.normal(0, 1, len(t_ecg))
# 工频干扰 (50Hz)
power_line_noise = 0.03 * np.sin(2*np.pi*50*t_ecg)

ecg_noisy = ecg_clean + baseline_drift + emg_noise + power_line_noise

# ECG信号滤波处理
fs_ecg = 1000
nyquist_ecg = fs_ecg / 2

# 1. 高通滤波器 - 去除基线漂移
hp_cutoff = 0.5 / nyquist_ecg
b_hp_ecg, a_hp_ecg = signal.butter(4, hp_cutoff, btype='high')
ecg_hp = signal.filtfilt(b_hp_ecg, a_hp_ecg, ecg_noisy)

# 2. 低通滤波器 - 去除高频噪声
lp_cutoff = 40 / nyquist_ecg
b_lp_ecg, a_lp_ecg = signal.butter(4, lp_cutoff, btype='low')
ecg_lp = signal.filtfilt(b_lp_ecg, a_lp_ecg, ecg_hp)

# 3. 陷波滤波器 - 去除工频干扰
notch_freq = 50 / nyquist_ecg
Q = 30  # 品质因子
b_notch, a_notch = signal.iirnotch(notch_freq, Q)
ecg_filtered = signal.filtfilt(b_notch, a_notch, ecg_lp)

# R波检测
from scipy.signal import find_peaks

# 使用峰值检测算法
peaks, properties = find_peaks(ecg_filtered, 
                              height=0.5,      # 最小高度
                              distance=int(0.6*fs_ecg),  # 最小间距(对应100bpm)
                              prominence=0.3)   # 显著性

# 计算心率
rr_intervals = np.diff(peaks) / fs_ecg  # R-R间期(秒)
heart_rates = 60 / rr_intervals  # 瞬时心率(bpm)
avg_heart_rate = np.mean(heart_rates)

# 可视化ECG处理结果
fig, axes = plt.subplots(4, 1, figsize=(15, 12))

# 原始和含噪ECG
axes[0].plot(t_ecg, ecg_clean, 'g-', linewidth=2, label='原始ECG')
axes[0].plot(t_ecg, ecg_noisy, 'r-', alpha=0.7, linewidth=1, label='含噪ECG')
axes[0].set_title('ECG信号对比')
axes[0].set_ylabel('幅度 (mV)')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
axes[0].set_xlim(0, 3)

# 滤波过程
axes[1].plot(t_ecg, ecg_noisy, 'r-', alpha=0.5, linewidth=1, label='含噪ECG')
axes[1].plot(t_ecg, ecg_hp, 'b-', alpha=0.7, linewidth=1, label='高通滤波')
axes[1].plot(t_ecg, ecg_lp, 'm-', alpha=0.7, linewidth=1, label='低通滤波')
axes[1].plot(t_ecg, ecg_filtered, 'g-', linewidth=2, label='最终滤波')
axes[1].set_title('ECG滤波处理过程')
axes[1].set_ylabel('幅度 (mV)')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
axes[1].set_xlim(0, 3)

# R波检测结果
axes[2].plot(t_ecg, ecg_filtered, 'b-', linewidth=2, label='滤波后ECG')
axes[2].plot(t_ecg[peaks], ecg_filtered[peaks], 'ro', markersize=8, label='检测到的R波')
axes[2].set_title(f'R波检测 (检测到 {len(peaks)} 个R波)')
axes[2].set_ylabel('幅度 (mV)')
axes[2].legend()
axes[2].grid(True, alpha=0.3)
axes[2].set_xlim(0, 5)

# 心率变化
if len(heart_rates) > 0:
    axes[3].plot(t_ecg[peaks[1:]], heart_rates, 'ro-', linewidth=2, markersize=6)
    axes[3].axhline(y=avg_heart_rate, color='g', linestyle='--', 
                   label=f'平均心率: {avg_heart_rate:.1f} bpm')
    axes[3].set_title('瞬时心率变化')
    axes[3].set_xlabel('时间 (s)')
    axes[3].set_ylabel('心率 (bpm)')
    axes[3].legend()
    axes[3].grid(True, alpha=0.3)
    axes[3].set_ylim(60, 90)

plt.tight_layout()
plt.show()

print("\nECG信号分析结果:")
print(f"检测到的R波数量: {len(peaks)}")
print(f"平均心率: {avg_heart_rate:.1f} bpm")
print(f"心率变异性 (RMSSD): {np.sqrt(np.mean(np.diff(rr_intervals)**2))*1000:.1f} ms")
print(f"R-R间期范围: {np.min(rr_intervals):.3f} - {np.max(rr_intervals):.3f} s")

2. 音频信号处理

python
# 生成音频信号示例
def generate_audio_signal(duration=3, fs=44100):
    """生成模拟音频信号"""
    t = np.linspace(0, duration, int(fs * duration), endpoint=False)
    
    # 基频和谐波
    fundamental = 440  # A4音符
    audio = (np.sin(2*np.pi*fundamental*t) +           # 基频
            0.5*np.sin(2*np.pi*2*fundamental*t) +      # 二次谐波
            0.3*np.sin(2*np.pi*3*fundamental*t) +      # 三次谐波
            0.2*np.sin(2*np.pi*4*fundamental*t))       # 四次谐波
    
    # 添加包络
    envelope = np.exp(-t/2)  # 指数衰减
    audio *= envelope
    
    return t, audio

# 生成音频信号
t_audio, audio_clean = generate_audio_signal(duration=2, fs=8000)
fs_audio = 8000

# 添加噪声和失真
np.random.seed(42)
audio_noise = 0.1 * np.random.normal(0, 1, len(t_audio))
audio_noisy = audio_clean + audio_noise

# 音频信号处理
# 1. 降噪滤波
nyquist_audio = fs_audio / 2
lp_cutoff_audio = 4000 / nyquist_audio  # 低通滤波
b_lp_audio, a_lp_audio = signal.butter(6, lp_cutoff_audio, btype='low')
audio_filtered = signal.filtfilt(b_lp_audio, a_lp_audio, audio_noisy)

# 2. 频谱分析
f_audio, t_audio_stft, Zxx_audio = signal.stft(audio_filtered, fs=fs_audio, 
                                              window='hann', nperseg=512, noverlap=256)

# 3. 音频特征提取
# 计算频谱质心
spectral_centroids = []
for i in range(Zxx_audio.shape[1]):
    magnitude = np.abs(Zxx_audio[:, i])
    if np.sum(magnitude) > 0:
        centroid = np.sum(f_audio * magnitude) / np.sum(magnitude)
        spectral_centroids.append(centroid)
    else:
        spectral_centroids.append(0)

# 计算过零率
def zero_crossing_rate(signal_data, frame_length=512, hop_length=256):
    """计算过零率"""
    zcr = []
    for i in range(0, len(signal_data) - frame_length, hop_length):
        frame = signal_data[i:i+frame_length]
        crossings = np.sum(np.abs(np.diff(np.sign(frame)))) / 2
        zcr.append(crossings / frame_length)
    return np.array(zcr)

zcr = zero_crossing_rate(audio_filtered)

# 可视化音频处理结果
fig, axes = plt.subplots(4, 1, figsize=(15, 12))

# 原始和处理后的音频
axes[0].plot(t_audio[:8000], audio_clean[:8000], 'g-', linewidth=1, label='原始音频')
axes[0].plot(t_audio[:8000], audio_noisy[:8000], 'r-', alpha=0.7, linewidth=1, label='含噪音频')
axes[0].plot(t_audio[:8000], audio_filtered[:8000], 'b-', linewidth=1.5, label='滤波后')
axes[0].set_title('音频信号处理')
axes[0].set_ylabel('幅度')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# 频谱图
im = axes[1].pcolormesh(t_audio_stft, f_audio, 20*np.log10(np.abs(Zxx_audio)), 
                       shading='gouraud', cmap='viridis')
axes[1].set_title('音频频谱图')
axes[1].set_ylabel('频率 (Hz)')
axes[1].set_ylim(0, 2000)
plt.colorbar(im, ax=axes[1], label='幅度 (dB)')

# 频谱质心
axes[2].plot(t_audio_stft, spectral_centroids, 'r-', linewidth=2)
axes[2].set_title('频谱质心变化')
axes[2].set_ylabel('频率 (Hz)')
axes[2].grid(True, alpha=0.3)

# 过零率
zcr_time = np.arange(len(zcr)) * 256 / fs_audio
axes[3].plot(zcr_time, zcr, 'b-', linewidth=2)
axes[3].set_title('过零率')
axes[3].set_xlabel('时间 (s)')
axes[3].set_ylabel('过零率')
axes[3].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("\n音频信号分析结果:")
print(f"平均频谱质心: {np.mean(spectral_centroids):.1f} Hz")
print(f"平均过零率: {np.mean(zcr):.4f}")
print(f"信号持续时间: {len(t_audio)/fs_audio:.2f} 秒")
print(f"采样频率: {fs_audio} Hz")

性能优化技巧

1. 算法选择和参数优化

python
import time

# 比较不同卷积算法的性能
def benchmark_convolution():
    """比较不同卷积算法的性能"""
    # 创建测试信号
    signal_length = 10000
    kernel_length = 100
    
    test_signal = np.random.randn(signal_length)
    test_kernel = np.random.randn(kernel_length)
    
    methods = {
        'direct': lambda: signal.convolve(test_signal, test_kernel, method='direct'),
        'fft': lambda: signal.convolve(test_signal, test_kernel, method='fft'),
        'auto': lambda: signal.convolve(test_signal, test_kernel, method='auto')
    }
    
    results = {}
    
    for name, method in methods.items():
        # 预热
        method()
        
        # 计时
        start_time = time.time()
        for _ in range(10):
            result = method()
        end_time = time.time()
        
        results[name] = (end_time - start_time) / 10
    
    return results

# 运行性能测试
conv_times = benchmark_convolution()

print("卷积算法性能比较:")
for method, time_taken in conv_times.items():
    print(f"{method:8s}: {time_taken*1000:.2f} ms")

# 最快的方法
fastest_method = min(conv_times, key=conv_times.get)
print(f"\n最快方法: {fastest_method}")

2. 内存优化

python
# 大信号处理的内存优化策略
def process_large_signal_chunked(signal_data, chunk_size=1024, overlap=128):
    """分块处理大信号以节省内存"""
    processed_chunks = []
    
    for i in range(0, len(signal_data) - overlap, chunk_size - overlap):
        # 提取块
        chunk = signal_data[i:i + chunk_size]
        
        # 处理块(这里以低通滤波为例)
        b, a = signal.butter(4, 0.1)
        processed_chunk = signal.filtfilt(b, a, chunk)
        
        # 去除重叠部分(除了第一块)
        if i == 0:
            processed_chunks.append(processed_chunk)
        else:
            processed_chunks.append(processed_chunk[overlap//2:])
    
    return np.concatenate(processed_chunks)

# 示例:处理大信号
large_signal = np.random.randn(50000)
processed_signal = process_large_signal_chunked(large_signal)

print(f"\n大信号处理:")
print(f"原始信号长度: {len(large_signal)}")
print(f"处理后信号长度: {len(processed_signal)}")
print(f"内存使用优化: 分块处理")

小结

本章详细介绍了 SciPy 的信号处理功能,主要内容包括:

核心概念

  • 信号生成: 掌握了各种基本波形的生成方法
  • 频谱分析: 学习了FFT、STFT等频域分析技术
  • 数字滤波: 理解了IIR和FIR滤波器的设计与应用
  • 窗函数: 了解了不同窗函数对频谱分析的影响
  • 卷积相关: 掌握了信号的卷积和相关运算

实际应用

  • 生物医学信号: ECG信号的滤波和特征提取
  • 音频处理: 音频信号的降噪和特征分析
  • 性能优化: 大信号处理的内存和计算优化

最佳实践

  1. 选择合适的滤波器类型:根据应用需求选择IIR或FIR
  2. 窗函数的选择:平衡频率分辨率和泄漏抑制
  3. 参数调优:根据信号特性调整滤波器参数
  4. 性能考虑:对于大数据量采用分块处理策略

练习题

基础练习

  1. 信号生成练习

    • 生成一个包含多个频率成分的复合信号
    • 添加不同类型的噪声
    • 使用FFT分析其频谱特性
  2. 滤波器设计练习

    • 设计一个带通滤波器去除特定频率的干扰
    • 比较Butterworth、Chebyshev和Elliptic滤波器的性能
    • 分析滤波器的幅度和相位响应
  3. 窗函数应用练习

    • 使用不同窗函数分析同一信号的频谱
    • 比较各窗函数的频率分辨率和泄漏特性
    • 选择最适合特定应用的窗函数

进阶练习

  1. 实际信号处理项目

    • 处理一段真实的音频信号(如语音或音乐)
    • 实现噪声抑制算法
    • 提取音频的特征参数
  2. 自适应滤波练习

    • 实现一个简单的自适应滤波器
    • 用于消除未知特性的干扰信号
    • 分析收敛性能和稳态误差

通过这些练习,你将能够熟练运用SciPy进行各种信号处理任务,为后续的高级应用打下坚实基础。

本站内容仅供学习和研究使用。