高级特性
概述
本章介绍 Node.js 的高级特性,包括集群、工作线程、性能优化、内存管理以及构建可扩展应用程序的高级模式。
集群
基础集群实现
javascript
// cluster-basic.js
const cluster = require('cluster');
const http = require('http');
const os = require('os');
const numCPUs = os.cpus().length;
if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`);
console.log(`Starting ${numCPUs} workers...`);
// Fork workers
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
// Handle worker events
cluster.on('online', (worker) => {
console.log(`Worker ${worker.process.pid} is online`);
});
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} died with code ${code} and signal ${signal}`);
console.log('Starting a new worker...');
cluster.fork();
});
// Graceful shutdown
process.on('SIGTERM', () => {
console.log('Master received SIGTERM, shutting down gracefully...');
for (const id in cluster.workers) {
cluster.workers[id].kill();
}
});
} else {
// Worker process
const server = http.createServer((req, res) => {
// Simulate some work
const start = Date.now();
while (Date.now() - start < 100) {
// CPU intensive task
}
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({
message: 'Hello from worker',
pid: process.pid,
timestamp: new Date().toISOString()
}));
});
server.listen(3000, () => {
console.log(`Worker ${process.pid} started server on port 3000`);
});
// Handle worker shutdown
process.on('SIGTERM', () => {
console.log(`Worker ${process.pid} received SIGTERM, shutting down...`);
server.close(() => {
process.exit(0);
});
});
}高级集群管理
javascript
// cluster-advanced.js
const cluster = require('cluster');
const express = require('express');
const os = require('os');
class ClusterManager {
constructor(options = {}) {
this.numWorkers = options.numWorkers || os.cpus().length;
this.restartDelay = options.restartDelay || 1000;
this.maxRestarts = options.maxRestarts || 10;
this.workerRestarts = new Map();
this.gracefulShutdownTimeout = options.gracefulShutdownTimeout || 30000;
}
start(workerFunction) {
if (cluster.isMaster) {
this.startMaster(workerFunction);
} else {
this.startWorker(workerFunction);
}
}
startMaster(workerFunction) {
console.log(`Master ${process.pid} starting with ${this.numWorkers} workers`);
// Fork workers
for (let i = 0; i < this.numWorkers; i++) {
this.forkWorker();
}
// Handle worker events
cluster.on('online', (worker) => {
console.log(`Worker ${worker.process.pid} online`);
this.workerRestarts.set(worker.id, 0);
});
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} died (${signal || code})`);
this.handleWorkerExit(worker);
});
// Handle master shutdown
this.setupGracefulShutdown();
// Monitor worker health
this.startHealthMonitoring();
}
forkWorker() {
const worker = cluster.fork();
// Set up worker communication
worker.on('message', (message) => {
this.handleWorkerMessage(worker, message);
});
return worker;
}
handleWorkerExit(worker) {
const restarts = this.workerRestarts.get(worker.id) || 0;
if (restarts < this.maxRestarts) {
console.log(`Restarting worker ${worker.id} (restart ${restarts + 1}/${this.maxRestarts})`);
setTimeout(() => {
const newWorker = this.forkWorker();
this.workerRestarts.set(newWorker.id, restarts + 1);
}, this.restartDelay);
} else {
console.error(`Worker ${worker.id} exceeded max restarts, not restarting`);
}
}
handleWorkerMessage(worker, message) {
switch (message.type) {
case 'health':
console.log(`Worker ${worker.process.pid} health:`, message.data);
break;
case 'error':
console.error(`Worker ${worker.process.pid} error:`, message.data);
break;
case 'metrics':
this.handleWorkerMetrics(worker, message.data);
break;
}
}
handleWorkerMetrics(worker, metrics) {
// Log or store worker metrics
console.log(`Worker ${worker.process.pid} metrics:`, {
memory: metrics.memory,
cpu: metrics.cpu,
requests: metrics.requests
});
}
startHealthMonitoring() {
setInterval(() => {
for (const id in cluster.workers) {
const worker = cluster.workers[id];
worker.send({ type: 'health-check' });
}
}, 30000); // Check every 30 seconds
}
setupGracefulShutdown() {
const shutdown = (signal) => {
console.log(`Master received ${signal}, initiating graceful shutdown...`);
// Stop accepting new connections
for (const id in cluster.workers) {
cluster.workers[id].send({ type: 'shutdown' });
}
// Force shutdown after timeout
setTimeout(() => {
console.log('Force shutdown after timeout');
process.exit(1);
}, this.gracefulShutdownTimeout);
// Wait for all workers to exit
cluster.on('exit', () => {
if (Object.keys(cluster.workers).length === 0) {
console.log('All workers exited, master shutting down');
process.exit(0);
}
});
};
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
}
startWorker(workerFunction) {
const app = express();
// Add worker metrics middleware
app.use(this.createMetricsMiddleware());
// Set up worker message handling
process.on('message', (message) => {
this.handleMasterMessage(message);
});
// Start the worker application
workerFunction(app);
// Send health updates
this.startWorkerHealthReporting();
}
createMetricsMiddleware() {
let requestCount = 0;
return (req, res, next) => {
requestCount++;
const start = Date.now();
res.on('finish', () => {
const duration = Date.now() - start;
// Send metrics to master periodically
if (requestCount % 100 === 0) {
process.send({
type: 'metrics',
data: {
requests: requestCount,
memory: process.memoryUsage(),
cpu: process.cpuUsage(),
averageResponseTime: duration
}
});
}
});
next();
};
}
handleMasterMessage(message) {
switch (message.type) {
case 'health-check':
process.send({
type: 'health',
data: {
pid: process.pid,
memory: process.memoryUsage(),
uptime: process.uptime()
}
});
break;
case 'shutdown':
this.gracefulWorkerShutdown();
break;
}
}
startWorkerHealthReporting() {
setInterval(() => {
process.send({
type: 'health',
data: {
pid: process.pid,
memory: process.memoryUsage(),
uptime: process.uptime()
}
});
}, 60000); // Report every minute
}
gracefulWorkerShutdown() {
console.log(`Worker ${process.pid} starting graceful shutdown...`);
// Stop accepting new requests
process.exit(0);
}
}
// Usage
const clusterManager = new ClusterManager({
numWorkers: 4,
maxRestarts: 5,
gracefulShutdownTimeout: 30000
});
clusterManager.start((app) => {
app.get('/', (req, res) => {
res.json({
message: 'Hello from clustered app',
pid: process.pid,
timestamp: new Date().toISOString()
});
});
app.listen(3000, () => {
console.log(`Worker ${process.pid} listening on port 3000`);
});
});工作线程
基础工作线程
javascript
// worker-threads-basic.js
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
const path = require('path');
if (isMainThread) {
// Main thread
console.log('Main thread starting...');
// CPU-intensive task
function fibonacci(n) {
if (n < 2) return n;
return fibonacci(n - 1) + fibonacci(n - 2);
}
// Run in main thread (blocking)
console.time('Main thread fibonacci');
const result1 = fibonacci(40);
console.timeEnd('Main thread fibonacci');
console.log('Main thread result:', result1);
// Run in worker thread (non-blocking)
console.time('Worker thread fibonacci');
const worker = new Worker(__filename, {
workerData: { number: 40 }
});
worker.on('message', (result) => {
console.timeEnd('Worker thread fibonacci');
console.log('Worker thread result:', result);
worker.terminate();
});
worker.on('error', (error) => {
console.error('Worker error:', error);
});
worker.on('exit', (code) => {
if (code !== 0) {
console.error(`Worker stopped with exit code ${code}`);
}
});
} else {
// Worker thread
function fibonacci(n) {
if (n < 2) return n;
return fibonacci(n - 1) + fibonacci(n - 2);
}
const result = fibonacci(workerData.number);
parentPort.postMessage(result);
}工作线程池
javascript
// worker-pool.js
const { Worker } = require('worker_threads');
const path = require('path');
const EventEmitter = require('events');
class WorkerPool extends EventEmitter {
constructor(workerScript, poolSize = 4) {
super();
this.workerScript = workerScript;
this.poolSize = poolSize;
this.workers = [];
this.queue = [];
this.activeJobs = new Map();
this.createWorkers();
}
createWorkers() {
for (let i = 0; i < this.poolSize; i++) {
this.createWorker();
}
}
createWorker() {
const worker = new Worker(this.workerScript);
worker.on('message', (result) => {
this.handleWorkerMessage(worker, result);
});
worker.on('error', (error) => {
console.error('Worker error:', error);
this.handleWorkerError(worker, error);
});
worker.on('exit', (code) => {
if (code !== 0) {
console.error(`Worker exited with code ${code}`);
}
this.removeWorker(worker);
});
worker.isAvailable = true;
this.workers.push(worker);
}
handleWorkerMessage(worker, result) {
const job = this.activeJobs.get(worker);
if (job) {
job.resolve(result.data);
this.activeJobs.delete(worker);
worker.isAvailable = true;
// Process next job in queue
this.processQueue();
}
}
handleWorkerError(worker, error) {
const job = this.activeJobs.get(worker);
if (job) {
job.reject(error);
this.activeJobs.delete(worker);
}
worker.isAvailable = true;
this.processQueue();
}
removeWorker(worker) {
const index = this.workers.indexOf(worker);
if (index > -1) {
this.workers.splice(index, 1);
}
// Create replacement worker
this.createWorker();
}
execute(data) {
return new Promise((resolve, reject) => {
const job = { data, resolve, reject };
const availableWorker = this.workers.find(w => w.isAvailable);
if (availableWorker) {
this.assignJob(availableWorker, job);
} else {
this.queue.push(job);
}
});
}
assignJob(worker, job) {
worker.isAvailable = false;
this.activeJobs.set(worker, job);
worker.postMessage(job.data);
}
processQueue() {
if (this.queue.length === 0) return;
const availableWorker = this.workers.find(w => w.isAvailable);
if (availableWorker) {
const job = this.queue.shift();
this.assignJob(availableWorker, job);
}
}
getStats() {
return {
poolSize: this.poolSize,
activeWorkers: this.workers.filter(w => !w.isAvailable).length,
queueLength: this.queue.length,
totalWorkers: this.workers.length
};
}
async terminate() {
const terminationPromises = this.workers.map(worker => worker.terminate());
await Promise.all(terminationPromises);
this.workers = [];
this.queue = [];
this.activeJobs.clear();
}
}
// Worker script (save as worker-task.js)
const workerTaskScript = `
const { parentPort } = require('worker_threads');
function heavyComputation(data) {
// Simulate CPU-intensive task
let result = 0;
for (let i = 0; i < data.iterations; i++) {
result += Math.sqrt(i);
}
return result;
}
parentPort.on('message', (data) => {
try {
const result = heavyComputation(data);
parentPort.postMessage({ success: true, data: result });
} catch (error) {
parentPort.postMessage({ success: false, error: error.message });
}
});
`;
// Usage example
async function demonstrateWorkerPool() {
// Create worker script file
const fs = require('fs');
const workerScriptPath = path.join(__dirname, 'worker-task.js');
fs.writeFileSync(workerScriptPath, workerTaskScript);
const pool = new WorkerPool(workerScriptPath, 4);
try {
console.log('Starting worker pool tasks...');
const tasks = Array.from({ length: 10 }, (_, i) => ({
id: i,
iterations: 1000000 + (i * 100000)
}));
const startTime = Date.now();
const results = await Promise.all(
tasks.map(task => pool.execute(task))
);
const endTime = Date.now();
console.log(`Completed ${tasks.length} tasks in ${endTime - startTime}ms`);
console.log('Pool stats:', pool.getStats());
await pool.terminate();
// Clean up
fs.unlinkSync(workerScriptPath);
} catch (error) {
console.error('Worker pool error:', error);
}
}
module.exports = WorkerPool;性能优化
内存管理
javascript
// memory-optimization.js
const v8 = require('v8');
class MemoryManager {
constructor() {
this.memoryThreshold = 0.8; // 80% of heap limit
this.gcInterval = 30000; // 30 seconds
this.monitoring = false;
}
startMonitoring() {
if (this.monitoring) return;
this.monitoring = true;
console.log('Memory monitoring started');
this.monitoringInterval = setInterval(() => {
this.checkMemoryUsage();
}, this.gcInterval);
}
stopMonitoring() {
if (this.monitoringInterval) {
clearInterval(this.monitoringInterval);
this.monitoring = false;
console.log('Memory monitoring stopped');
}
}
checkMemoryUsage() {
const memUsage = process.memoryUsage();
const heapStats = v8.getHeapStatistics();
const heapUsedPercent = memUsage.heapUsed / heapStats.heap_size_limit;
console.log('Memory usage:', {
heapUsed: this.formatBytes(memUsage.heapUsed),
heapTotal: this.formatBytes(memUsage.heapTotal),
external: this.formatBytes(memUsage.external),
rss: this.formatBytes(memUsage.rss),
heapUsedPercent: (heapUsedPercent * 100).toFixed(2) + '%'
});
if (heapUsedPercent > this.memoryThreshold) {
console.warn('Memory usage high, triggering garbage collection');
this.forceGarbageCollection();
}
}
forceGarbageCollection() {
if (global.gc) {
global.gc();
console.log('Garbage collection completed');
} else {
console.warn('Garbage collection not available. Start with --expose-gc flag');
}
}
getMemoryStats() {
const memUsage = process.memoryUsage();
const heapStats = v8.getHeapStatistics();
return {
process: {
rss: memUsage.rss,
heapTotal: memUsage.heapTotal,
heapUsed: memUsage.heapUsed,
external: memUsage.external,
arrayBuffers: memUsage.arrayBuffers
},
v8: {
totalHeapSize: heapStats.total_heap_size,
totalHeapSizeExecutable: heapStats.total_heap_size_executable,
totalPhysicalSize: heapStats.total_physical_size,
totalAvailableSize: heapStats.total_available_size,
usedHeapSize: heapStats.used_heap_size,
heapSizeLimit: heapStats.heap_size_limit,
mallocedMemory: heapStats.malloced_memory,
peakMallocedMemory: heapStats.peak_malloced_memory
}
};
}
formatBytes(bytes) {
const sizes = ['Bytes', 'KB', 'MB', 'GB'];
if (bytes === 0) return '0 Bytes';
const i = Math.floor(Math.log(bytes) / Math.log(1024));
return Math.round(bytes / Math.pow(1024, i) * 100) / 100 + ' ' + sizes[i];
}
// Memory leak detection
detectMemoryLeaks() {
const initialMemory = process.memoryUsage().heapUsed;
let measurements = [];
const measureInterval = setInterval(() => {
const currentMemory = process.memoryUsage().heapUsed;
measurements.push(currentMemory);
if (measurements.length > 10) {
measurements.shift(); // Keep only last 10 measurements
}
// Check for consistent memory growth
if (measurements.length === 10) {
const trend = this.calculateTrend(measurements);
if (trend > 0.1) { // 10% growth trend
console.warn('Potential memory leak detected:', {
initialMemory: this.formatBytes(initialMemory),
currentMemory: this.formatBytes(currentMemory),
growthTrend: (trend * 100).toFixed(2) + '%'
});
}
}
}, 5000);
// Stop after 5 minutes
setTimeout(() => {
clearInterval(measureInterval);
}, 300000);
}
calculateTrend(values) {
const n = values.length;
const sumX = n * (n - 1) / 2;
const sumY = values.reduce((a, b) => a + b, 0);
const sumXY = values.reduce((sum, y, x) => sum + x * y, 0);
const sumXX = n * (n - 1) * (2 * n - 1) / 6;
const slope = (n * sumXY - sumX * sumY) / (n * sumXX - sumX * sumX);
return slope / (sumY / n); // Normalize by average
}
}
module.exports = MemoryManager;下一步
在下一章中,我们将探索 Node.js 应用程序的测试策略和框架。
关键要点
- 集群支持跨 CPU 核心的水平扩展
- 工作线程在不阻塞主线程的情况下处理 CPU 密集型任务
- 内存管理防止性能下降和崩溃
- 性能监控有助于识别瓶颈
- 优雅关闭确保重启期间的数据完整性
- 负载均衡在资源之间高效分配工作