Skip to content

高级特性

概述

本章介绍 Node.js 的高级特性,包括集群、工作线程、性能优化、内存管理以及构建可扩展应用程序的高级模式。

集群

基础集群实现

javascript
// cluster-basic.js
const cluster = require('cluster');
const http = require('http');
const os = require('os');

const numCPUs = os.cpus().length;

if (cluster.isMaster) {
  console.log(`Master ${process.pid} is running`);
  console.log(`Starting ${numCPUs} workers...`);

  // Fork workers
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  // Handle worker events
  cluster.on('online', (worker) => {
    console.log(`Worker ${worker.process.pid} is online`);
  });

  cluster.on('exit', (worker, code, signal) => {
    console.log(`Worker ${worker.process.pid} died with code ${code} and signal ${signal}`);
    console.log('Starting a new worker...');
    cluster.fork();
  });

  // Graceful shutdown
  process.on('SIGTERM', () => {
    console.log('Master received SIGTERM, shutting down gracefully...');
    
    for (const id in cluster.workers) {
      cluster.workers[id].kill();
    }
  });

} else {
  // Worker process
  const server = http.createServer((req, res) => {
    // Simulate some work
    const start = Date.now();
    while (Date.now() - start < 100) {
      // CPU intensive task
    }

    res.writeHead(200, { 'Content-Type': 'application/json' });
    res.end(JSON.stringify({
      message: 'Hello from worker',
      pid: process.pid,
      timestamp: new Date().toISOString()
    }));
  });

  server.listen(3000, () => {
    console.log(`Worker ${process.pid} started server on port 3000`);
  });

  // Handle worker shutdown
  process.on('SIGTERM', () => {
    console.log(`Worker ${process.pid} received SIGTERM, shutting down...`);
    server.close(() => {
      process.exit(0);
    });
  });
}

高级集群管理

javascript
// cluster-advanced.js
const cluster = require('cluster');
const express = require('express');
const os = require('os');

class ClusterManager {
  constructor(options = {}) {
    this.numWorkers = options.numWorkers || os.cpus().length;
    this.restartDelay = options.restartDelay || 1000;
    this.maxRestarts = options.maxRestarts || 10;
    this.workerRestarts = new Map();
    this.gracefulShutdownTimeout = options.gracefulShutdownTimeout || 30000;
  }

  start(workerFunction) {
    if (cluster.isMaster) {
      this.startMaster(workerFunction);
    } else {
      this.startWorker(workerFunction);
    }
  }

  startMaster(workerFunction) {
    console.log(`Master ${process.pid} starting with ${this.numWorkers} workers`);

    // Fork workers
    for (let i = 0; i < this.numWorkers; i++) {
      this.forkWorker();
    }

    // Handle worker events
    cluster.on('online', (worker) => {
      console.log(`Worker ${worker.process.pid} online`);
      this.workerRestarts.set(worker.id, 0);
    });

    cluster.on('exit', (worker, code, signal) => {
      console.log(`Worker ${worker.process.pid} died (${signal || code})`);
      this.handleWorkerExit(worker);
    });

    // Handle master shutdown
    this.setupGracefulShutdown();

    // Monitor worker health
    this.startHealthMonitoring();
  }

  forkWorker() {
    const worker = cluster.fork();
    
    // Set up worker communication
    worker.on('message', (message) => {
      this.handleWorkerMessage(worker, message);
    });

    return worker;
  }

  handleWorkerExit(worker) {
    const restarts = this.workerRestarts.get(worker.id) || 0;
    
    if (restarts < this.maxRestarts) {
      console.log(`Restarting worker ${worker.id} (restart ${restarts + 1}/${this.maxRestarts})`);
      
      setTimeout(() => {
        const newWorker = this.forkWorker();
        this.workerRestarts.set(newWorker.id, restarts + 1);
      }, this.restartDelay);
    } else {
      console.error(`Worker ${worker.id} exceeded max restarts, not restarting`);
    }
  }

  handleWorkerMessage(worker, message) {
    switch (message.type) {
      case 'health':
        console.log(`Worker ${worker.process.pid} health:`, message.data);
        break;
      case 'error':
        console.error(`Worker ${worker.process.pid} error:`, message.data);
        break;
      case 'metrics':
        this.handleWorkerMetrics(worker, message.data);
        break;
    }
  }

  handleWorkerMetrics(worker, metrics) {
    // Log or store worker metrics
    console.log(`Worker ${worker.process.pid} metrics:`, {
      memory: metrics.memory,
      cpu: metrics.cpu,
      requests: metrics.requests
    });
  }

  startHealthMonitoring() {
    setInterval(() => {
      for (const id in cluster.workers) {
        const worker = cluster.workers[id];
        worker.send({ type: 'health-check' });
      }
    }, 30000); // Check every 30 seconds
  }

  setupGracefulShutdown() {
    const shutdown = (signal) => {
      console.log(`Master received ${signal}, initiating graceful shutdown...`);
      
      // Stop accepting new connections
      for (const id in cluster.workers) {
        cluster.workers[id].send({ type: 'shutdown' });
      }

      // Force shutdown after timeout
      setTimeout(() => {
        console.log('Force shutdown after timeout');
        process.exit(1);
      }, this.gracefulShutdownTimeout);

      // Wait for all workers to exit
      cluster.on('exit', () => {
        if (Object.keys(cluster.workers).length === 0) {
          console.log('All workers exited, master shutting down');
          process.exit(0);
        }
      });
    };

    process.on('SIGTERM', () => shutdown('SIGTERM'));
    process.on('SIGINT', () => shutdown('SIGINT'));
  }

  startWorker(workerFunction) {
    const app = express();
    
    // Add worker metrics middleware
    app.use(this.createMetricsMiddleware());
    
    // Set up worker message handling
    process.on('message', (message) => {
      this.handleMasterMessage(message);
    });

    // Start the worker application
    workerFunction(app);

    // Send health updates
    this.startWorkerHealthReporting();
  }

  createMetricsMiddleware() {
    let requestCount = 0;
    
    return (req, res, next) => {
      requestCount++;
      
      const start = Date.now();
      
      res.on('finish', () => {
        const duration = Date.now() - start;
        
        // Send metrics to master periodically
        if (requestCount % 100 === 0) {
          process.send({
            type: 'metrics',
            data: {
              requests: requestCount,
              memory: process.memoryUsage(),
              cpu: process.cpuUsage(),
              averageResponseTime: duration
            }
          });
        }
      });
      
      next();
    };
  }

  handleMasterMessage(message) {
    switch (message.type) {
      case 'health-check':
        process.send({
          type: 'health',
          data: {
            pid: process.pid,
            memory: process.memoryUsage(),
            uptime: process.uptime()
          }
        });
        break;
      case 'shutdown':
        this.gracefulWorkerShutdown();
        break;
    }
  }

  startWorkerHealthReporting() {
    setInterval(() => {
      process.send({
        type: 'health',
        data: {
          pid: process.pid,
          memory: process.memoryUsage(),
          uptime: process.uptime()
        }
      });
    }, 60000); // Report every minute
  }

  gracefulWorkerShutdown() {
    console.log(`Worker ${process.pid} starting graceful shutdown...`);
    
    // Stop accepting new requests
    process.exit(0);
  }
}

// Usage
const clusterManager = new ClusterManager({
  numWorkers: 4,
  maxRestarts: 5,
  gracefulShutdownTimeout: 30000
});

clusterManager.start((app) => {
  app.get('/', (req, res) => {
    res.json({
      message: 'Hello from clustered app',
      pid: process.pid,
      timestamp: new Date().toISOString()
    });
  });

  app.listen(3000, () => {
    console.log(`Worker ${process.pid} listening on port 3000`);
  });
});

工作线程

基础工作线程

javascript
// worker-threads-basic.js
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
const path = require('path');

if (isMainThread) {
  // Main thread
  console.log('Main thread starting...');

  // CPU-intensive task
  function fibonacci(n) {
    if (n < 2) return n;
    return fibonacci(n - 1) + fibonacci(n - 2);
  }

  // Run in main thread (blocking)
  console.time('Main thread fibonacci');
  const result1 = fibonacci(40);
  console.timeEnd('Main thread fibonacci');
  console.log('Main thread result:', result1);

  // Run in worker thread (non-blocking)
  console.time('Worker thread fibonacci');
  const worker = new Worker(__filename, {
    workerData: { number: 40 }
  });

  worker.on('message', (result) => {
    console.timeEnd('Worker thread fibonacci');
    console.log('Worker thread result:', result);
    worker.terminate();
  });

  worker.on('error', (error) => {
    console.error('Worker error:', error);
  });

  worker.on('exit', (code) => {
    if (code !== 0) {
      console.error(`Worker stopped with exit code ${code}`);
    }
  });

} else {
  // Worker thread
  function fibonacci(n) {
    if (n < 2) return n;
    return fibonacci(n - 1) + fibonacci(n - 2);
  }

  const result = fibonacci(workerData.number);
  parentPort.postMessage(result);
}

工作线程池

javascript
// worker-pool.js
const { Worker } = require('worker_threads');
const path = require('path');
const EventEmitter = require('events');

class WorkerPool extends EventEmitter {
  constructor(workerScript, poolSize = 4) {
    super();
    this.workerScript = workerScript;
    this.poolSize = poolSize;
    this.workers = [];
    this.queue = [];
    this.activeJobs = new Map();
    
    this.createWorkers();
  }

  createWorkers() {
    for (let i = 0; i < this.poolSize; i++) {
      this.createWorker();
    }
  }

  createWorker() {
    const worker = new Worker(this.workerScript);
    
    worker.on('message', (result) => {
      this.handleWorkerMessage(worker, result);
    });

    worker.on('error', (error) => {
      console.error('Worker error:', error);
      this.handleWorkerError(worker, error);
    });

    worker.on('exit', (code) => {
      if (code !== 0) {
        console.error(`Worker exited with code ${code}`);
      }
      this.removeWorker(worker);
    });

    worker.isAvailable = true;
    this.workers.push(worker);
  }

  handleWorkerMessage(worker, result) {
    const job = this.activeJobs.get(worker);
    
    if (job) {
      job.resolve(result.data);
      this.activeJobs.delete(worker);
      worker.isAvailable = true;
      
      // Process next job in queue
      this.processQueue();
    }
  }

  handleWorkerError(worker, error) {
    const job = this.activeJobs.get(worker);
    
    if (job) {
      job.reject(error);
      this.activeJobs.delete(worker);
    }
    
    worker.isAvailable = true;
    this.processQueue();
  }

  removeWorker(worker) {
    const index = this.workers.indexOf(worker);
    if (index > -1) {
      this.workers.splice(index, 1);
    }
    
    // Create replacement worker
    this.createWorker();
  }

  execute(data) {
    return new Promise((resolve, reject) => {
      const job = { data, resolve, reject };
      
      const availableWorker = this.workers.find(w => w.isAvailable);
      
      if (availableWorker) {
        this.assignJob(availableWorker, job);
      } else {
        this.queue.push(job);
      }
    });
  }

  assignJob(worker, job) {
    worker.isAvailable = false;
    this.activeJobs.set(worker, job);
    worker.postMessage(job.data);
  }

  processQueue() {
    if (this.queue.length === 0) return;
    
    const availableWorker = this.workers.find(w => w.isAvailable);
    
    if (availableWorker) {
      const job = this.queue.shift();
      this.assignJob(availableWorker, job);
    }
  }

  getStats() {
    return {
      poolSize: this.poolSize,
      activeWorkers: this.workers.filter(w => !w.isAvailable).length,
      queueLength: this.queue.length,
      totalWorkers: this.workers.length
    };
  }

  async terminate() {
    const terminationPromises = this.workers.map(worker => worker.terminate());
    await Promise.all(terminationPromises);
    this.workers = [];
    this.queue = [];
    this.activeJobs.clear();
  }
}

// Worker script (save as worker-task.js)
const workerTaskScript = `
const { parentPort } = require('worker_threads');

function heavyComputation(data) {
  // Simulate CPU-intensive task
  let result = 0;
  for (let i = 0; i < data.iterations; i++) {
    result += Math.sqrt(i);
  }
  return result;
}

parentPort.on('message', (data) => {
  try {
    const result = heavyComputation(data);
    parentPort.postMessage({ success: true, data: result });
  } catch (error) {
    parentPort.postMessage({ success: false, error: error.message });
  }
});
`;

// Usage example
async function demonstrateWorkerPool() {
  // Create worker script file
  const fs = require('fs');
  const workerScriptPath = path.join(__dirname, 'worker-task.js');
  fs.writeFileSync(workerScriptPath, workerTaskScript);

  const pool = new WorkerPool(workerScriptPath, 4);

  try {
    console.log('Starting worker pool tasks...');
    
    const tasks = Array.from({ length: 10 }, (_, i) => ({
      id: i,
      iterations: 1000000 + (i * 100000)
    }));

    const startTime = Date.now();
    
    const results = await Promise.all(
      tasks.map(task => pool.execute(task))
    );

    const endTime = Date.now();
    
    console.log(`Completed ${tasks.length} tasks in ${endTime - startTime}ms`);
    console.log('Pool stats:', pool.getStats());
    
    await pool.terminate();
    
    // Clean up
    fs.unlinkSync(workerScriptPath);
    
  } catch (error) {
    console.error('Worker pool error:', error);
  }
}

module.exports = WorkerPool;

性能优化

内存管理

javascript
// memory-optimization.js
const v8 = require('v8');

class MemoryManager {
  constructor() {
    this.memoryThreshold = 0.8; // 80% of heap limit
    this.gcInterval = 30000; // 30 seconds
    this.monitoring = false;
  }

  startMonitoring() {
    if (this.monitoring) return;
    
    this.monitoring = true;
    console.log('Memory monitoring started');
    
    this.monitoringInterval = setInterval(() => {
      this.checkMemoryUsage();
    }, this.gcInterval);
  }

  stopMonitoring() {
    if (this.monitoringInterval) {
      clearInterval(this.monitoringInterval);
      this.monitoring = false;
      console.log('Memory monitoring stopped');
    }
  }

  checkMemoryUsage() {
    const memUsage = process.memoryUsage();
    const heapStats = v8.getHeapStatistics();
    
    const heapUsedPercent = memUsage.heapUsed / heapStats.heap_size_limit;
    
    console.log('Memory usage:', {
      heapUsed: this.formatBytes(memUsage.heapUsed),
      heapTotal: this.formatBytes(memUsage.heapTotal),
      external: this.formatBytes(memUsage.external),
      rss: this.formatBytes(memUsage.rss),
      heapUsedPercent: (heapUsedPercent * 100).toFixed(2) + '%'
    });

    if (heapUsedPercent > this.memoryThreshold) {
      console.warn('Memory usage high, triggering garbage collection');
      this.forceGarbageCollection();
    }
  }

  forceGarbageCollection() {
    if (global.gc) {
      global.gc();
      console.log('Garbage collection completed');
    } else {
      console.warn('Garbage collection not available. Start with --expose-gc flag');
    }
  }

  getMemoryStats() {
    const memUsage = process.memoryUsage();
    const heapStats = v8.getHeapStatistics();
    
    return {
      process: {
        rss: memUsage.rss,
        heapTotal: memUsage.heapTotal,
        heapUsed: memUsage.heapUsed,
        external: memUsage.external,
        arrayBuffers: memUsage.arrayBuffers
      },
      v8: {
        totalHeapSize: heapStats.total_heap_size,
        totalHeapSizeExecutable: heapStats.total_heap_size_executable,
        totalPhysicalSize: heapStats.total_physical_size,
        totalAvailableSize: heapStats.total_available_size,
        usedHeapSize: heapStats.used_heap_size,
        heapSizeLimit: heapStats.heap_size_limit,
        mallocedMemory: heapStats.malloced_memory,
        peakMallocedMemory: heapStats.peak_malloced_memory
      }
    };
  }

  formatBytes(bytes) {
    const sizes = ['Bytes', 'KB', 'MB', 'GB'];
    if (bytes === 0) return '0 Bytes';
    const i = Math.floor(Math.log(bytes) / Math.log(1024));
    return Math.round(bytes / Math.pow(1024, i) * 100) / 100 + ' ' + sizes[i];
  }

  // Memory leak detection
  detectMemoryLeaks() {
    const initialMemory = process.memoryUsage().heapUsed;
    let measurements = [];
    
    const measureInterval = setInterval(() => {
      const currentMemory = process.memoryUsage().heapUsed;
      measurements.push(currentMemory);
      
      if (measurements.length > 10) {
        measurements.shift(); // Keep only last 10 measurements
      }
      
      // Check for consistent memory growth
      if (measurements.length === 10) {
        const trend = this.calculateTrend(measurements);
        
        if (trend > 0.1) { // 10% growth trend
          console.warn('Potential memory leak detected:', {
            initialMemory: this.formatBytes(initialMemory),
            currentMemory: this.formatBytes(currentMemory),
            growthTrend: (trend * 100).toFixed(2) + '%'
          });
        }
      }
    }, 5000);
    
    // Stop after 5 minutes
    setTimeout(() => {
      clearInterval(measureInterval);
    }, 300000);
  }

  calculateTrend(values) {
    const n = values.length;
    const sumX = n * (n - 1) / 2;
    const sumY = values.reduce((a, b) => a + b, 0);
    const sumXY = values.reduce((sum, y, x) => sum + x * y, 0);
    const sumXX = n * (n - 1) * (2 * n - 1) / 6;
    
    const slope = (n * sumXY - sumX * sumY) / (n * sumXX - sumX * sumX);
    return slope / (sumY / n); // Normalize by average
  }
}

module.exports = MemoryManager;

下一步

在下一章中,我们将探索 Node.js 应用程序的测试策略和框架。

关键要点

  • 集群支持跨 CPU 核心的水平扩展
  • 工作线程在不阻塞主线程的情况下处理 CPU 密集型任务
  • 内存管理防止性能下降和崩溃
  • 性能监控有助于识别瓶颈
  • 优雅关闭确保重启期间的数据完整性
  • 负载均衡在资源之间高效分配工作

本站内容仅供学习和研究使用。