高级功能
在这一章中,我们将探索 NumPy 的高级功能,包括随机数生成、多项式操作、傅里叶变换、文件I/O等。这些功能让 NumPy 成为科学计算的强大工具。
随机数生成
基本随机数生成
python
import numpy as np
# 设置随机种子以获得可重复的结果
np.random.seed(42)
# 基本随机数生成
print("基本随机数生成:")
print(f"单个随机数: {np.random.random()}")
print(f"随机整数: {np.random.randint(1, 10)}")
print(f"随机数组: {np.random.random(5)}")
print(f"随机整数数组: {np.random.randint(1, 10, 5)}")
print()
# 不同分布的随机数
print("不同分布的随机数:")
print(f"正态分布: {np.random.normal(0, 1, 5)}")
print(f"均匀分布: {np.random.uniform(-1, 1, 5)}")
print(f"指数分布: {np.random.exponential(2, 5)}")
print(f"泊松分布: {np.random.poisson(3, 5)}")
print()
# 多维随机数组
print("多维随机数组:")
random_2d = np.random.random((3, 4))
print(random_2d)
print()
# 从数组中随机选择
array = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
print("从数组中随机选择:")
print(f"随机选择1个: {np.random.choice(array)}")
print(f"随机选择3个: {np.random.choice(array, 3)}")
print(f"不重复选择3个: {np.random.choice(array, 3, replace=False)}")
print(f"带权重选择: {np.random.choice(array, 3, p=[0.1]*10)}")输出结果:
基本随机数生成:
单个随机数: 0.3745401188473625
随机整数: 7
随机数组: [0.95071431 0.73199394 0.59865848 0.15601864 0.15599452]
随机整数数组: [6 3 7 4 6]
不同分布的随机数:
正态分布: [ 0.49671415 -0.1382643 0.64768854 1.52302986 -0.23415337]
均匀分布: [-0.23413696 1.57921282 0.76743473 -0.46947439 0.54256004]
指数分布: [0.46341769 0.46572975 0.24196227 4.17304802 1.20482421]
泊松分布: [2 2 1 1 2]
多维随机数组:
[[0.15599452 0.05808361 0.86617615 0.60111501]
[0.70807258 0.02058449 0.96990985 0.83244264]
[0.21233911 0.18182497 0.18340451 0.30424224]]
从数组中随机选择:
随机选择1个: 9
随机选择3个: [2 9 6]
不重复选择3个: [2 8 4]
带权重选择: [6 8 4]新式随机数生成器(推荐)
python
import numpy as np
# 使用新的随机数生成器
rng = np.random.default_rng(42)
print("新式随机数生成器:")
print(f"随机数: {rng.random()}")
print(f"正态分布: {rng.normal(0, 1, 5)}")
print(f"整数: {rng.integers(1, 10, 5)}")
print()
# 随机排列
array = np.arange(10)
print(f"原始数组: {array}")
rng.shuffle(array) # 就地打乱
print(f"打乱后: {array}")
# 生成排列
original = np.arange(10)
permutation = rng.permutation(original)
print(f"原始数组: {original}")
print(f"新排列: {permutation}")
print()
# 随机采样
data = np.arange(100)
sample = rng.choice(data, size=10, replace=False)
print(f"随机采样: {sample}")随机数应用示例
python
import numpy as np
# 蒙特卡洛方法估算π
def estimate_pi(n_samples):
rng = np.random.default_rng(42)
# 在单位正方形内生成随机点
x = rng.uniform(-1, 1, n_samples)
y = rng.uniform(-1, 1, n_samples)
# 计算落在单位圆内的点数
inside_circle = (x**2 + y**2) <= 1
pi_estimate = 4 * np.sum(inside_circle) / n_samples
return pi_estimate
print("蒙特卡洛估算π:")
for n in [1000, 10000, 100000, 1000000]:
pi_est = estimate_pi(n)
error = abs(pi_est - np.pi)
print(f"样本数: {n:7d}, π估计: {pi_est:.6f}, 误差: {error:.6f}")
print(f"真实π值: {np.pi:.6f}")
print()
# 随机游走
def random_walk_2d(steps):
rng = np.random.default_rng(42)
# 随机方向:上、下、左、右
directions = np.array([[0, 1], [0, -1], [-1, 0], [1, 0]])
# 随机选择方向
chosen_directions = rng.choice(4, steps)
moves = directions[chosen_directions]
# 计算位置
positions = np.cumsum(moves, axis=0)
return positions
print("二维随机游走:")
walk = random_walk_2d(20)
print(f"起始位置: (0, 0)")
print(f"最终位置: ({walk[-1, 0]}, {walk[-1, 1]})")
print(f"最大距离: {np.max(np.sqrt(np.sum(walk**2, axis=1))):.2f}")
print(f"平均距离: {np.mean(np.sqrt(np.sum(walk**2, axis=1))):.2f}")多项式操作
多项式基础
python
import numpy as np
# 创建多项式 p(x) = 2x³ + 3x² - x + 5
# 系数按降幂排列
coeffs = [2, 3, -1, 5]
print(f"多项式系数: {coeffs}")
print("多项式: 2x³ + 3x² - x + 5")
print()
# 计算多项式的值
x_values = np.array([0, 1, 2, -1])
results = np.polyval(coeffs, x_values)
print(f"x值: {x_values}")
print(f"p(x): {results}")
print()
# 验证计算
for x, result in zip(x_values, results):
manual_calc = 2*x**3 + 3*x**2 - x + 5
print(f"p({x}) = {result}, 手动计算: {manual_calc}")
print()
# 多项式的根
roots = np.roots(coeffs)
print(f"多项式的根: {roots}")
print("验证根:")
for i, root in enumerate(roots):
value = np.polyval(coeffs, root)
print(f" 根{i+1}: {root:.4f}, p(根) = {value:.2e}")输出结果:
多项式系数: [2, 3, -1, 5]
多项式: 2x³ + 3x² - x + 5
x值: [ 0 1 2 -1]
p(x): [ 5. 9. 25. 1.]
p(0) = 5.0, 手动计算: 5
p(1) = 9.0, 手动计算: 9
p(2) = 25.0, 手动计算: 25
p(-1) = 1.0, 手动计算: 1
多项式的根: [-2.56155281+0.j 0.28077641+0.8660254j 0.28077641-0.8660254j]
验证根:
根1: -2.5616+0.0000j, p(根) = -7.11e-15
根2: 0.2808+0.8660j, p(根) = 0.00e+00
根3: 0.2808-0.8660j, p(根) = 0.00e+00多项式运算
python
import numpy as np
# 定义两个多项式
# p1(x) = x² + 2x + 1 = (x + 1)²
# p2(x) = x - 1
p1 = [1, 2, 1]
p2 = [1, -1]
print("多项式运算:")
print(f"p1(x) = x² + 2x + 1")
print(f"p2(x) = x - 1")
print()
# 多项式加法
sum_poly = np.polyadd(p1, p2)
print(f"p1 + p2 = {sum_poly}")
print(f"即: x² + 3x")
print()
# 多项式减法
diff_poly = np.polysub(p1, p2)
print(f"p1 - p2 = {diff_poly}")
print(f"即: x² + x + 2")
print()
# 多项式乘法
mul_poly = np.polymul(p1, p2)
print(f"p1 * p2 = {mul_poly}")
print(f"即: x³ + x² - x - 1")
print()
# 多项式除法
quotient, remainder = np.polydiv(p1, p2)
print(f"p1 / p2:")
print(f" 商: {quotient}")
print(f" 余数: {remainder}")
print(f" 即: (x² + 2x + 1) = (x - 1) * (x + 3) + 4")
print()
# 多项式求导
derivative = np.polyder(p1)
print(f"p1的导数: {derivative}")
print(f"即: 2x + 2")
print()
# 多项式积分
integral = np.polyint(p1)
print(f"p1的积分: {integral}")
print(f"即: x³/3 + x² + x + C")多项式拟合
python
import numpy as np
# 生成带噪声的数据
np.random.seed(42)
x_data = np.linspace(0, 10, 20)
# 真实函数: y = 2x² - 3x + 1
y_true = 2 * x_data**2 - 3 * x_data + 1
y_noisy = y_true + np.random.normal(0, 5, len(x_data))
print("多项式拟合:")
print(f"数据点数: {len(x_data)}")
print(f"真实函数: y = 2x² - 3x + 1")
print()
# 不同阶数的多项式拟合
for degree in [1, 2, 3, 5]:
coeffs = np.polyfit(x_data, y_noisy, degree)
# 计算拟合优度
y_fit = np.polyval(coeffs, x_data)
r_squared = 1 - np.sum((y_noisy - y_fit)**2) / np.sum((y_noisy - np.mean(y_noisy))**2)
print(f"{degree}次多项式拟合:")
print(f" 系数: {coeffs}")
print(f" R²: {r_squared:.4f}")
if degree == 2:
print(f" 拟合函数: {coeffs[0]:.2f}x² + {coeffs[1]:.2f}x + {coeffs[2]:.2f}")
print(f" 与真实函数对比: 2x² - 3x + 1")
print()
# 预测新数据点
x_new = np.array([2.5, 7.5])
best_fit_coeffs = np.polyfit(x_data, y_noisy, 2) # 使用2次多项式
y_pred = np.polyval(best_fit_coeffs, x_new)
y_true_new = 2 * x_new**2 - 3 * x_new + 1
print("预测结果:")
for x, pred, true in zip(x_new, y_pred, y_true_new):
print(f"x = {x}: 预测值 = {pred:.2f}, 真实值 = {true:.2f}, 误差 = {abs(pred-true):.2f}")傅里叶变换
基本傅里叶变换
python
import numpy as np
# 创建信号
t = np.linspace(0, 1, 1000, endpoint=False)
freq1, freq2 = 5, 10 # 5Hz 和 10Hz
signal = np.sin(2 * np.pi * freq1 * t) + 0.5 * np.sin(2 * np.pi * freq2 * t)
print("傅里叶变换分析:")
print(f"信号长度: {len(signal)}")
print(f"采样频率: {1/(t[1]-t[0]):.0f} Hz")
print(f"信号包含频率: {freq1} Hz 和 {freq2} Hz")
print()
# 计算FFT
fft_result = np.fft.fft(signal)
freqs = np.fft.fftfreq(len(signal), t[1] - t[0])
# 计算幅度谱
amplitude = np.abs(fft_result)
phase = np.angle(fft_result)
# 只考虑正频率部分
positive_freq_idx = freqs > 0
positive_freqs = freqs[positive_freq_idx]
positive_amplitude = amplitude[positive_freq_idx]
# 找到主要频率成分
peak_indices = np.where(positive_amplitude > 0.1 * np.max(positive_amplitude))[0]
peak_freqs = positive_freqs[peak_indices]
peak_amplitudes = positive_amplitude[peak_indices]
print("检测到的频率成分:")
for freq, amp in zip(peak_freqs, peak_amplitudes):
if freq < 50: # 只显示低频成分
print(f" 频率: {freq:.1f} Hz, 幅度: {amp:.1f}")
print()
# 逆傅里叶变换
reconstructed = np.fft.ifft(fft_result)
reconstruction_error = np.mean(np.abs(signal - np.real(reconstructed)))
print(f"重构误差: {reconstruction_error:.2e}")
print()
# 频域滤波示例
# 创建低通滤波器(保留10Hz以下的频率)
cutoff_freq = 15
filtered_fft = fft_result.copy()
filtered_fft[np.abs(freqs) > cutoff_freq] = 0
# 逆变换得到滤波后的信号
filtered_signal = np.real(np.fft.ifft(filtered_fft))
print(f"低通滤波(截止频率: {cutoff_freq} Hz):")
print(f"原始信号能量: {np.sum(signal**2):.2f}")
print(f"滤波后能量: {np.sum(filtered_signal**2):.2f}")
print(f"能量保持率: {np.sum(filtered_signal**2)/np.sum(signal**2)*100:.1f}%")输出结果:
傅里叶变换分析:
信号长度: 1000
采样频率: 1000 Hz
信号包含频率: 5 Hz 和 10 Hz
检测到的频率成分:
频率: 5.0 Hz, 幅度: 500.0
频率: 10.0 Hz, 幅度: 250.0
重构误差: 1.39e-15
低通滤波(截止频率: 15 Hz):
原始信号能量: 750.00
滤波后能量: 750.00
能量保持率: 100.0%二维傅里叶变换
python
import numpy as np
# 创建二维信号(模拟图像)
x = np.linspace(0, 1, 64)
y = np.linspace(0, 1, 64)
X, Y = np.meshgrid(x, y)
# 创建包含不同频率成分的图像
image = (np.sin(2 * np.pi * 5 * X) * np.cos(2 * np.pi * 3 * Y) +
0.5 * np.sin(2 * np.pi * 10 * X) * np.sin(2 * np.pi * 8 * Y))
print("二维傅里叶变换:")
print(f"图像尺寸: {image.shape}")
print(f"图像统计: 均值={np.mean(image):.3f}, 标准差={np.std(image):.3f}")
print()
# 二维FFT
fft_2d = np.fft.fft2(image)
fft_shifted = np.fft.fftshift(fft_2d) # 将零频率移到中心
# 计算幅度谱
amplitude_spectrum = np.abs(fft_shifted)
log_amplitude = np.log(1 + amplitude_spectrum)
print("频域分析:")
print(f"最大幅度: {np.max(amplitude_spectrum):.1f}")
print(f"频谱中心位置: ({image.shape[0]//2}, {image.shape[1]//2})")
print()
# 逆变换验证
reconstructed = np.real(np.fft.ifft2(fft_2d))
reconstruction_error = np.mean(np.abs(image - reconstructed))
print(f"重构误差: {reconstruction_error:.2e}")
print()
# 频域滤波
# 创建低通滤波器
center_x, center_y = image.shape[0]//2, image.shape[1]//2
y_idx, x_idx = np.ogrid[:image.shape[0], :image.shape[1]]
distance = np.sqrt((x_idx - center_x)**2 + (y_idx - center_y)**2)
# 高斯低通滤波器
sigma = 10
gaussian_filter = np.exp(-(distance**2) / (2 * sigma**2))
# 应用滤波器
filtered_fft = fft_shifted * gaussian_filter
filtered_image = np.real(np.fft.ifft2(np.fft.ifftshift(filtered_fft)))
print("高斯低通滤波:")
print(f"滤波器标准差: {sigma}")
print(f"原始图像能量: {np.sum(image**2):.2f}")
print(f"滤波后能量: {np.sum(filtered_image**2):.2f}")
print(f"能量保持率: {np.sum(filtered_image**2)/np.sum(image**2)*100:.1f}%")文件输入输出
文本文件操作
python
import numpy as np
import tempfile
import os
# 创建示例数据
data = np.random.random((5, 3))
print("原始数据:")
print(data)
print()
# 保存为文本文件
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
txt_filename = f.name
np.savetxt(txt_filename, data, fmt='%.6f', delimiter=',')
print(f"数据已保存到: {txt_filename}")
# 从文本文件读取
loaded_data = np.loadtxt(txt_filename, delimiter=',')
print("从文件加载的数据:")
print(loaded_data)
print(f"数据是否相同: {np.allclose(data, loaded_data)}")
print()
# 保存多个数组
array1 = np.array([1, 2, 3, 4, 5])
array2 = np.array([10, 20, 30, 40, 50])
header_info = "Array1,Array2"
combined_data = np.column_stack([array1, array2])
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f:
csv_filename = f.name
np.savetxt(csv_filename, combined_data, fmt='%d', delimiter=',', header=header_info)
print(f"CSV文件已保存到: {csv_filename}")
# 读取CSV文件
loaded_csv = np.loadtxt(csv_filename, delimiter=',', skiprows=1)
print("从CSV加载的数据:")
print(loaded_csv)
print()
# 清理临时文件
os.unlink(txt_filename)
os.unlink(csv_filename)二进制文件操作
python
import numpy as np
import tempfile
import os
# 创建测试数据
array_int = np.array([1, 2, 3, 4, 5], dtype=np.int32)
array_float = np.array([1.1, 2.2, 3.3, 4.4, 5.5], dtype=np.float64)
array_2d = np.random.random((3, 4))
print("二进制文件操作:")
print(f"整数数组: {array_int}")
print(f"浮点数组: {array_float}")
print(f"二维数组形状: {array_2d.shape}")
print()
# 保存单个数组为二进制文件
with tempfile.NamedTemporaryFile(suffix='.npy', delete=False) as f:
npy_filename = f.name
np.save(npy_filename, array_2d)
print(f"二维数组已保存到: {npy_filename}")
# 加载二进制文件
loaded_2d = np.load(npy_filename)
print(f"加载的数组形状: {loaded_2d.shape}")
print(f"数据是否相同: {np.allclose(array_2d, loaded_2d)}")
print()
# 保存多个数组到一个文件
with tempfile.NamedTemporaryFile(suffix='.npz', delete=False) as f:
npz_filename = f.name
np.savez(npz_filename,
integers=array_int,
floats=array_float,
matrix=array_2d)
print(f"多个数组已保存到: {npz_filename}")
# 加载多个数组
loaded_arrays = np.load(npz_filename)
print("加载的数组:")
for key in loaded_arrays.files:
print(f" {key}: 形状={loaded_arrays[key].shape}, 类型={loaded_arrays[key].dtype}")
print()
# 压缩保存
with tempfile.NamedTemporaryFile(suffix='.npz', delete=False) as f:
compressed_filename = f.name
np.savez_compressed(compressed_filename,
integers=array_int,
floats=array_float,
matrix=array_2d)
# 比较文件大小
original_size = os.path.getsize(npz_filename)
compressed_size = os.path.getsize(compressed_filename)
compression_ratio = original_size / compressed_size
print(f"文件大小比较:")
print(f" 原始文件: {original_size} 字节")
print(f" 压缩文件: {compressed_size} 字节")
print(f" 压缩比: {compression_ratio:.2f}")
print()
# 清理临时文件
os.unlink(npy_filename)
os.unlink(npz_filename)
os.unlink(compressed_filename)内存映射文件
python
import numpy as np
import tempfile
import os
# 创建大型数组(模拟大数据)
large_array = np.random.random((1000, 1000))
print("内存映射文件:")
print(f"大型数组形状: {large_array.shape}")
print(f"数组大小: {large_array.nbytes / 1024 / 1024:.2f} MB")
print()
# 保存为内存映射文件
with tempfile.NamedTemporaryFile(suffix='.dat', delete=False) as f:
memmap_filename = f.name
# 创建内存映射文件
memmap_array = np.memmap(memmap_filename, dtype='float64', mode='w+', shape=(1000, 1000))
memmap_array[:] = large_array[:]
memmap_array.flush() # 确保数据写入磁盘
print(f"内存映射文件已创建: {memmap_filename}")
print(f"文件大小: {os.path.getsize(memmap_filename) / 1024 / 1024:.2f} MB")
print()
# 以只读模式打开内存映射文件
readonly_memmap = np.memmap(memmap_filename, dtype='float64', mode='r', shape=(1000, 1000))
# 验证数据
print("数据验证:")
print(f"原始数组均值: {np.mean(large_array):.6f}")
print(f"内存映射均值: {np.mean(readonly_memmap):.6f}")
print(f"数据是否相同: {np.allclose(large_array, readonly_memmap)}")
print()
# 部分读取(内存映射的优势)
print("部分数据读取:")
subset = readonly_memmap[100:200, 100:200] # 只读取一小部分
print(f"子集形状: {subset.shape}")
print(f"子集均值: {np.mean(subset):.6f}")
print()
# 清理
del memmap_array, readonly_memmap # 关闭内存映射
os.unlink(memmap_filename)性能优化和内存管理
内存使用优化
python
import numpy as np
import sys
print("内存使用优化:")
# 数据类型对内存的影响
size = 1000000
array_float64 = np.ones(size, dtype=np.float64)
array_float32 = np.ones(size, dtype=np.float32)
array_int32 = np.ones(size, dtype=np.int32)
array_int16 = np.ones(size, dtype=np.int16)
array_int8 = np.ones(size, dtype=np.int8)
print(f"数组大小: {size:,} 元素")
print(f"float64: {array_float64.nbytes / 1024 / 1024:.2f} MB")
print(f"float32: {array_float32.nbytes / 1024 / 1024:.2f} MB")
print(f"int32: {array_int32.nbytes / 1024 / 1024:.2f} MB")
print(f"int16: {array_int16.nbytes / 1024 / 1024:.2f} MB")
print(f"int8: {array_int8.nbytes / 1024 / 1024:.2f} MB")
print()
# 视图 vs 副本
original = np.arange(1000000)
print(f"原始数组内存: {original.nbytes / 1024 / 1024:.2f} MB")
# 创建视图(不占用额外内存)
view = original[::2] # 每隔一个元素
print(f"视图是否共享内存: {np.shares_memory(original, view)}")
print(f"视图基础对象: {view.base is original}")
# 创建副本(占用额外内存)
copy = original[::2].copy()
print(f"副本是否共享内存: {np.shares_memory(original, copy)}")
print(f"副本基础对象: {copy.base is None}")
print()
# 就地操作 vs 创建新数组
test_array = np.random.random(100000)
original_id = id(test_array)
# 就地操作
test_array += 1
print(f"就地操作后ID相同: {id(test_array) == original_id}")
# 创建新数组的操作
test_array = test_array + 1
print(f"新数组操作后ID相同: {id(test_array) == original_id}")性能测试和优化
python
import numpy as np
import time
def time_function(func, *args, **kwargs):
"""测量函数执行时间"""
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
return result, end_time - start_time
print("性能测试和优化:")
# 创建测试数据
size = 1000000
array1 = np.random.random(size)
array2 = np.random.random(size)
print(f"测试数组大小: {size:,}")
print()
# 测试不同的计算方法
print("计算 sqrt(x² + y²) 的不同方法:")
# 方法1: 使用 np.sqrt
result1, time1 = time_function(lambda: np.sqrt(array1**2 + array2**2))
print(f"方法1 (np.sqrt): {time1:.4f}秒")
# 方法2: 使用 np.hypot
result2, time2 = time_function(lambda: np.hypot(array1, array2))
print(f"方法2 (np.hypot): {time2:.4f}秒")
# 方法3: 使用 np.linalg.norm
stacked = np.stack([array1, array2], axis=0)
result3, time3 = time_function(lambda: np.linalg.norm(stacked, axis=0))
print(f"方法3 (np.linalg.norm): {time3:.4f}秒")
print(f"结果是否相同: {np.allclose(result1, result2) and np.allclose(result2, result3)}")
print(f"最快方法: 方法{np.argmin([time1, time2, time3]) + 1}")
print()
# 内存布局对性能的影响
print("内存布局对性能的影响:")
# C风格(行优先)vs Fortran风格(列优先)
array_c = np.random.random((1000, 1000))
array_f = np.asfortranarray(array_c)
print(f"C风格数组: {array_c.flags['C_CONTIGUOUS']}")
print(f"Fortran风格数组: {array_f.flags['F_CONTIGUOUS']}")
# 按行求和
_, time_c_row = time_function(lambda: np.sum(array_c, axis=1))
_, time_f_row = time_function(lambda: np.sum(array_f, axis=1))
# 按列求和
_, time_c_col = time_function(lambda: np.sum(array_c, axis=0))
_, time_f_col = time_function(lambda: np.sum(array_f, axis=0))
print(f"C风格按行求和: {time_c_row:.4f}秒")
print(f"C风格按列求和: {time_c_col:.4f}秒")
print(f"F风格按行求和: {time_f_row:.4f}秒")
print(f"F风格按列求和: {time_f_col:.4f}秒")
print()
# 缓存友好的访问模式
print("缓存友好的访问模式:")
matrix = np.random.random((1000, 1000))
# 按行访问(缓存友好)
def row_wise_sum():
total = 0
for i in range(matrix.shape[0]):
total += np.sum(matrix[i, :])
return total
# 按列访问(缓存不友好)
def col_wise_sum():
total = 0
for j in range(matrix.shape[1]):
total += np.sum(matrix[:, j])
return total
_, time_row_wise = time_function(row_wise_sum)
_, time_col_wise = time_function(col_wise_sum)
print(f"按行访问: {time_row_wise:.4f}秒")
print(f"按列访问: {time_col_wise:.4f}秒")
print(f"性能差异: {time_col_wise/time_row_wise:.2f}x")实际应用示例
示例1:数字信号处理
python
import numpy as np
def design_fir_filter(cutoff_freq, sampling_freq, num_taps):
"""设计FIR低通滤波器"""
# 归一化截止频率
normalized_cutoff = cutoff_freq / (sampling_freq / 2)
# 生成理想低通滤波器的冲激响应
n = np.arange(num_taps)
h = np.sinc(2 * normalized_cutoff * (n - (num_taps - 1) / 2))
# 应用汉明窗
window = np.hamming(num_taps)
h = h * window
# 归一化
h = h / np.sum(h)
return h
def apply_filter(signal, filter_coeffs):
"""应用FIR滤波器"""
return np.convolve(signal, filter_coeffs, mode='same')
# 生成测试信号
fs = 1000 # 采样频率
t = np.linspace(0, 1, fs, endpoint=False)
# 组合信号:50Hz + 150Hz + 噪声
signal = (np.sin(2 * np.pi * 50 * t) +
0.5 * np.sin(2 * np.pi * 150 * t) +
0.2 * np.random.normal(0, 1, len(t)))
print("数字信号处理示例:")
print(f"采样频率: {fs} Hz")
print(f"信号长度: {len(signal)} 样本")
print(f"信号包含: 50Hz + 150Hz + 噪声")
print()
# 设计低通滤波器(截止频率100Hz)
cutoff = 100
num_taps = 101
filter_coeffs = design_fir_filter(cutoff, fs, num_taps)
print(f"FIR滤波器设计:")
print(f" 截止频率: {cutoff} Hz")
print(f" 滤波器阶数: {num_taps}")
print(f" 滤波器系数范围: [{np.min(filter_coeffs):.6f}, {np.max(filter_coeffs):.6f}]")
print()
# 应用滤波器
filtered_signal = apply_filter(signal, filter_coeffs)
# 分析滤波效果
original_power = np.mean(signal**2)
filtered_power = np.mean(filtered_signal**2)
print(f"滤波效果:")
print(f" 原始信号功率: {original_power:.4f}")
print(f" 滤波后功率: {filtered_power:.4f}")
print(f" 功率保持率: {filtered_power/original_power*100:.1f}%")
print()
# 频域分析
original_fft = np.fft.fft(signal)
filtered_fft = np.fft.fft(filtered_signal)
freqs = np.fft.fftfreq(len(signal), 1/fs)
# 计算50Hz和150Hz处的衰减
freq_50_idx = np.argmin(np.abs(freqs - 50))
freq_150_idx = np.argmin(np.abs(freqs - 150))
attenuation_50 = 20 * np.log10(np.abs(filtered_fft[freq_50_idx]) / np.abs(original_fft[freq_50_idx]))
attenuation_150 = 20 * np.log10(np.abs(filtered_fft[freq_150_idx]) / np.abs(original_fft[freq_150_idx]))
print(f"频率衰减:")
print(f" 50Hz处衰减: {attenuation_50:.1f} dB")
print(f" 150Hz处衰减: {attenuation_150:.1f} dB")示例2:图像处理进阶
python
import numpy as np
def create_test_image(size=128):
"""创建测试图像"""
x = np.linspace(-2, 2, size)
y = np.linspace(-2, 2, size)
X, Y = np.meshgrid(x, y)
# 创建复杂图案
image = (np.sin(3 * X) * np.cos(3 * Y) +
0.5 * np.sin(8 * X) * np.sin(8 * Y) +
0.3 * np.random.random((size, size)))
# 归一化到[0, 255]
image = ((image - np.min(image)) / (np.max(image) - np.min(image)) * 255).astype(np.uint8)
return image
def gaussian_kernel(size, sigma):
"""生成高斯核"""
kernel = np.zeros((size, size))
center = size // 2
for i in range(size):
for j in range(size):
x, y = i - center, j - center
kernel[i, j] = np.exp(-(x**2 + y**2) / (2 * sigma**2))
return kernel / np.sum(kernel)
def convolve_2d(image, kernel):
"""二维卷积"""
pad_h, pad_w = kernel.shape[0] // 2, kernel.shape[1] // 2
padded = np.pad(image, ((pad_h, pad_h), (pad_w, pad_w)), mode='reflect')
result = np.zeros_like(image, dtype=np.float32)
for i in range(image.shape[0]):
for j in range(image.shape[1]):
result[i, j] = np.sum(padded[i:i+kernel.shape[0], j:j+kernel.shape[1]] * kernel)
return result.astype(np.uint8)
def edge_detection(image):
"""边缘检测"""
# Sobel算子
sobel_x = np.array([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]])
sobel_y = np.array([[-1, -2, -1], [0, 0, 0], [1, 2, 1]])
# 计算梯度
grad_x = convolve_2d(image, sobel_x)
grad_y = convolve_2d(image, sobel_y)
# 计算梯度幅度
magnitude = np.sqrt(grad_x.astype(np.float32)**2 + grad_y.astype(np.float32)**2)
magnitude = (magnitude / np.max(magnitude) * 255).astype(np.uint8)
return magnitude
# 图像处理流水线
print("图像处理进阶示例:")
# 创建测试图像
original_image = create_test_image(64)
print(f"原始图像尺寸: {original_image.shape}")
print(f"像素值范围: [{np.min(original_image)}, {np.max(original_image)}]")
print()
# 高斯模糊
gaussian_5x5 = gaussian_kernel(5, 1.0)
blurred_image = convolve_2d(original_image, gaussian_5x5)
print(f"高斯模糊:")
print(f" 核大小: 5x5")
print(f" 标准差: 1.0")
print(f" 模糊后像素值范围: [{np.min(blurred_image)}, {np.max(blurred_image)}]")
print()
# 边缘检测
edges = edge_detection(original_image)
print(f"边缘检测:")
print(f" 边缘像素值范围: [{np.min(edges)}, {np.max(edges)}]")
print(f" 边缘像素数量: {np.sum(edges > 50)}")
print()
# 图像统计
print(f"图像统计比较:")
print(f" 原始图像均值: {np.mean(original_image):.1f}")
print(f" 模糊图像均值: {np.mean(blurred_image):.1f}")
print(f" 边缘图像均值: {np.mean(edges):.1f}")
print()
print(f" 原始图像标准差: {np.std(original_image):.1f}")
print(f" 模糊图像标准差: {np.std(blurred_image):.1f}")
print(f" 边缘图像标准差: {np.std(edges):.1f}")本章小结
在这一章中,我们探索了 NumPy 的高级功能:
- 随机数生成:各种分布的随机数、随机采样、蒙特卡洛方法
- 多项式操作:多项式计算、运算、拟合
- 傅里叶变换:一维和二维FFT、频域分析和滤波
- 文件I/O:文本文件、二进制文件、内存映射文件
- 性能优化:内存管理、缓存友好访问、向量化
- 实际应用:信号处理、图像处理等领域的应用
下一步
在下一章中,我们将学习 NumPy 的实用技巧和最佳实践,包括调试技巧、性能分析、与其他库的集成等。
练习题
- 使用蒙特卡洛方法估算定积分 ∫₀¹ x² dx
- 实现一个简单的数字滤波器设计函数
- 编写一个图像锐化滤波器
- 使用FFT实现快速卷积
- 创建一个内存高效的大数据处理函数
- 实现一个多项式回归类
- 编写一个随机游走可视化程序