Skip to content

核心概念

概述

本章介绍使 Node.js 独特而强大的基本概念:事件驱动编程、事件发射器模式、回调、Promise、async/await 以及 Node.js 事件循环的详细内容。

事件驱动编程

理解事件

Node.js 是围绕事件构建的。Node.js 中的许多对象都会发出事件,你可以监听这些事件并做出相应的响应:

javascript
// basic-events.js
const EventEmitter = require('events');

// Create an event emitter
const myEmitter = new EventEmitter();

// Listen for events
myEmitter.on('message', (data) => {
  console.log('Received message:', data);
});

myEmitter.on('error', (error) => {
  console.error('Error occurred:', error.message);
});

// Emit events
myEmitter.emit('message', 'Hello, World!');
myEmitter.emit('message', { user: 'John', text: 'How are you?' });
myEmitter.emit('error', new Error('Something went wrong'));

自定义事件发射器

创建继承 EventEmitter 的自定义类:

javascript
// custom-emitter.js
const EventEmitter = require('events');

class ChatRoom extends EventEmitter {
  constructor(name) {
    super();
    this.name = name;
    this.users = new Set();
    this.messages = [];
  }

  addUser(username) {
    if (this.users.has(username)) {
      this.emit('error', new Error('User already exists'));
      return false;
    }

    this.users.add(username);
    this.emit('userJoined', { username, room: this.name });
    return true;
  }

  removeUser(username) {
    if (!this.users.has(username)) {
      this.emit('error', new Error('User not found'));
      return false;
    }

    this.users.delete(username);
    this.emit('userLeft', { username, room: this.name });
    return true;
  }

  sendMessage(username, message) {
    if (!this.users.has(username)) {
      this.emit('error', new Error('User not in room'));
      return false;
    }

    const messageObj = {
      id: Date.now(),
      username,
      message,
      timestamp: new Date().toISOString()
    };

    this.messages.push(messageObj);
    this.emit('message', messageObj);
    return true;
  }

  getUsers() {
    return Array.from(this.users);
  }

  getMessages() {
    return [...this.messages];
  }
}

// Usage
const chatRoom = new ChatRoom('General');

// Listen for events
chatRoom.on('userJoined', (data) => {
  console.log(`${data.username} joined ${data.room}`);
});

chatRoom.on('userLeft', (data) => {
  console.log(`${data.username} left ${data.room}`);
});

chatRoom.on('message', (data) => {
  console.log(`[${data.timestamp}] ${data.username}: ${data.message}`);
});

chatRoom.on('error', (error) => {
  console.error('Chat room error:', error.message);
});

// Use the chat room
chatRoom.addUser('Alice');
chatRoom.addUser('Bob');
chatRoom.sendMessage('Alice', 'Hello everyone!');
chatRoom.sendMessage('Bob', 'Hi Alice!');
chatRoom.removeUser('Alice');

事件发射器模式

javascript
// event-patterns.js
const EventEmitter = require('events');

class DataProcessor extends EventEmitter {
  constructor() {
    super();
    this.processing = false;
    this.queue = [];
  }

  // Once - listen for event only once
  processOnce(data) {
    this.once('processed', (result) => {
      console.log('One-time processing result:', result);
    });
    
    this.process(data);
  }

  // Multiple listeners for same event
  process(data) {
    if (this.processing) {
      this.queue.push(data);
      this.emit('queued', { data, queueLength: this.queue.length });
      return;
    }

    this.processing = true;
    this.emit('started', data);

    // Simulate processing
    setTimeout(() => {
      const result = data.toUpperCase();
      this.processing = false;
      
      this.emit('processed', result);
      this.emit('completed', { input: data, output: result });

      // Process next item in queue
      if (this.queue.length > 0) {
        const nextData = this.queue.shift();
        this.process(nextData);
      }
    }, 1000);
  }

  // Remove listeners
  removeAllProcessedListeners() {
    this.removeAllListeners('processed');
  }

  // Get listener count
  getListenerCount(event) {
    return this.listenerCount(event);
  }
}

const processor = new DataProcessor();

// Multiple listeners for the same event
processor.on('processed', (result) => {
  console.log('Listener 1 - Processed:', result);
});

processor.on('processed', (result) => {
  console.log('Listener 2 - Saving to database:', result);
});

processor.on('started', (data) => {
  console.log('Processing started for:', data);
});

processor.on('queued', (info) => {
  console.log(`Item queued. Queue length: ${info.queueLength}`);
});

// Process multiple items
processor.process('hello');
processor.process('world');
processor.process('nodejs');

console.log('Processed listeners:', processor.getListenerCount('processed'));

异步编程模式

回调

处理异步操作的传统 Node.js 模式:

javascript
// callbacks.js
const fs = require('fs');

// Error-first callback pattern
function readFileCallback(filename, callback) {
  fs.readFile(filename, 'utf8', (error, data) => {
    if (error) {
      return callback(error, null);
    }
    callback(null, data);
  });
}

// Callback hell example (what to avoid)
function processFiles() {
  readFileCallback('file1.txt', (error1, data1) => {
    if (error1) {
      console.error('Error reading file1:', error1.message);
      return;
    }
    
    readFileCallback('file2.txt', (error2, data2) => {
      if (error2) {
        console.error('Error reading file2:', error2.message);
        return;
      }
      
      readFileCallback('file3.txt', (error3, data3) => {
        if (error3) {
          console.error('Error reading file3:', error3.message);
          return;
        }
        
        console.log('All files read successfully');
        console.log('Combined length:', data1.length + data2.length + data3.length);
      });
    });
  });
}

// Better callback pattern with error handling
function readMultipleFiles(filenames, callback) {
  const results = [];
  let completed = 0;
  let hasError = false;

  filenames.forEach((filename, index) => {
    readFileCallback(filename, (error, data) => {
      if (hasError) return;
      
      if (error) {
        hasError = true;
        return callback(error, null);
      }

      results[index] = data;
      completed++;

      if (completed === filenames.length) {
        callback(null, results);
      }
    });
  });
}

// Usage
readMultipleFiles(['package.json'], (error, results) => {
  if (error) {
    console.error('Error:', error.message);
    return;
  }
  console.log('Files read successfully:', results.length);
});

Promise

Promise 提供了一种更清晰的处理异步操作的方式:

javascript
// promises.js
const fs = require('fs').promises;

// Basic promise usage
function readFilePromise(filename) {
  return fs.readFile(filename, 'utf8');
}

// Promise chaining
readFilePromise('package.json')
  .then(data => {
    console.log('File read successfully');
    return JSON.parse(data);
  })
  .then(packageInfo => {
    console.log('Package name:', packageInfo.name);
    console.log('Package version:', packageInfo.version);
    return packageInfo;
  })
  .catch(error => {
    console.error('Error:', error.message);
  });

// Creating custom promises
function delay(ms) {
  return new Promise(resolve => {
    setTimeout(resolve, ms);
  });
}

function fetchUserData(userId) {
  return new Promise((resolve, reject) => {
    // Simulate API call
    setTimeout(() => {
      if (userId > 0) {
        resolve({
          id: userId,
          name: `User ${userId}`,
          email: `user${userId}@example.com`
        });
      } else {
        reject(new Error('Invalid user ID'));
      }
    }, 1000);
  });
}

// Promise.all - wait for all promises to complete
Promise.all([
  fetchUserData(1),
  fetchUserData(2),
  fetchUserData(3)
])
.then(users => {
  console.log('All users fetched:', users.length);
  users.forEach(user => console.log(`- ${user.name}`));
})
.catch(error => {
  console.error('Error fetching users:', error.message);
});

// Promise.allSettled - wait for all promises to settle (resolve or reject)
Promise.allSettled([
  fetchUserData(1),
  fetchUserData(-1), // This will reject
  fetchUserData(2)
])
.then(results => {
  console.log('All promises settled:');
  results.forEach((result, index) => {
    if (result.status === 'fulfilled') {
      console.log(`User ${index + 1}:`, result.value.name);
    } else {
      console.log(`User ${index + 1} failed:`, result.reason.message);
    }
  });
});

// Promise.race - resolve with the first promise that completes
Promise.race([
  fetchUserData(1),
  delay(500).then(() => 'Timeout')
])
.then(result => {
  console.log('Race result:', result);
});

Async/Await

处理异步操作的现代方式:

javascript
// async-await.js
const fs = require('fs').promises;

// Basic async/await
async function readPackageInfo() {
  try {
    const data = await fs.readFile('package.json', 'utf8');
    const packageInfo = JSON.parse(data);
    return packageInfo;
  } catch (error) {
    console.error('Error reading package.json:', error.message);
    throw error;
  }
}

// Sequential operations
async function processFilesSequentially() {
  try {
    console.log('Starting sequential processing...');
    
    const file1 = await fs.readFile('package.json', 'utf8');
    console.log('File 1 read, length:', file1.length);
    
    await delay(1000); // Wait 1 second
    
    const packageInfo = JSON.parse(file1);
    console.log('Package parsed:', packageInfo.name);
    
    return packageInfo;
  } catch (error) {
    console.error('Sequential processing error:', error.message);
    throw error;
  }
}

// Parallel operations
async function processFilesParallel() {
  try {
    console.log('Starting parallel processing...');
    
    const [file1, file2] = await Promise.all([
      fs.readFile('package.json', 'utf8'),
      delay(1000).then(() => 'delayed result')
    ]);
    
    console.log('Both operations completed');
    console.log('File length:', file1.length);
    console.log('Delayed result:', file2);
    
    return { file1, file2 };
  } catch (error) {
    console.error('Parallel processing error:', error.message);
    throw error;
  }
}

// Error handling with async/await
async function robustFileProcessor() {
  const filenames = ['package.json', 'nonexistent.txt', 'README.md'];
  const results = [];
  
  for (const filename of filenames) {
    try {
      const data = await fs.readFile(filename, 'utf8');
      results.push({ filename, success: true, size: data.length });
    } catch (error) {
      results.push({ filename, success: false, error: error.message });
    }
  }
  
  return results;
}

// Using async/await with loops
async function processItemsSequentially(items) {
  const results = [];
  
  for (const item of items) {
    try {
      const result = await processItem(item);
      results.push(result);
    } catch (error) {
      console.error(`Error processing ${item}:`, error.message);
    }
  }
  
  return results;
}

async function processItemsConcurrently(items) {
  const promises = items.map(item => processItem(item));
  
  try {
    const results = await Promise.all(promises);
    return results;
  } catch (error) {
    console.error('Error in concurrent processing:', error.message);
    throw error;
  }
}

async function processItem(item) {
  await delay(Math.random() * 1000);
  return `Processed: ${item}`;
}

// Helper function
function delay(ms) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

// Usage examples
async function main() {
  try {
    // Sequential processing
    await processFilesSequentially();
    
    // Parallel processing
    await processFilesParallel();
    
    // Robust error handling
    const results = await robustFileProcessor();
    console.log('File processing results:', results);
    
    // Process items
    const items = ['item1', 'item2', 'item3'];
    const sequentialResults = await processItemsSequentially(items);
    console.log('Sequential results:', sequentialResults);
    
  } catch (error) {
    console.error('Main function error:', error.message);
  }
}

main();

深入理解流

理解流

流是处理数据的强大抽象:

javascript
// streams-advanced.js
const { Readable, Writable, Transform, pipeline } = require('stream');
const fs = require('fs');

// Custom Readable Stream
class NumberGenerator extends Readable {
  constructor(options) {
    super(options);
    this.current = 1;
    this.max = options.max || 10;
  }

  _read() {
    if (this.current <= this.max) {
      this.push(`${this.current}\n`);
      this.current++;
    } else {
      this.push(null); // End of stream
    }
  }
}

// Custom Transform Stream
class NumberSquarer extends Transform {
  _transform(chunk, encoding, callback) {
    const number = parseInt(chunk.toString().trim());
    const squared = number * number;
    callback(null, `${number}^2 = ${squared}\n`);
  }
}

// Custom Writable Stream
class Logger extends Writable {
  constructor(options) {
    super(options);
    this.logFile = fs.createWriteStream('numbers.log', { flags: 'a' });
  }

  _write(chunk, encoding, callback) {
    const timestamp = new Date().toISOString();
    const logEntry = `[${timestamp}] ${chunk}`;
    
    console.log('Logging:', chunk.toString().trim());
    this.logFile.write(logEntry, callback);
  }

  _final(callback) {
    this.logFile.end(callback);
  }
}

// Using streams with pipeline
const numberGen = new NumberGenerator({ max: 5 });
const squarer = new NumberSquarer();
const logger = new Logger();

pipeline(
  numberGen,
  squarer,
  logger,
  (error) => {
    if (error) {
      console.error('Pipeline error:', error);
    } else {
      console.log('Pipeline completed successfully');
    }
  }
);

流工具

javascript
// stream-utilities.js
const { pipeline, Transform } = require('stream');
const fs = require('fs');

// CSV Parser Transform Stream
class CSVParser extends Transform {
  constructor(options = {}) {
    super({ objectMode: true });
    this.headers = null;
    this.delimiter = options.delimiter || ',';
  }

  _transform(chunk, encoding, callback) {
    const lines = chunk.toString().split('\n');
    
    for (const line of lines) {
      if (!line.trim()) continue;
      
      const values = line.split(this.delimiter);
      
      if (!this.headers) {
        this.headers = values;
        continue;
      }
      
      const obj = {};
      this.headers.forEach((header, index) => {
        obj[header.trim()] = values[index]?.trim() || '';
      });
      
      this.push(obj);
    }
    
    callback();
  }
}

// JSON Stringifier Transform Stream
class JSONStringifier extends Transform {
  constructor() {
    super({ objectMode: true });
    this.first = true;
  }

  _transform(chunk, encoding, callback) {
    if (this.first) {
      this.push('[\n');
      this.first = false;
    } else {
      this.push(',\n');
    }
    
    this.push(JSON.stringify(chunk, null, 2));
    callback();
  }

  _flush(callback) {
    this.push('\n]');
    callback();
  }
}

// Usage example
function convertCSVToJSON(inputFile, outputFile) {
  return new Promise((resolve, reject) => {
    pipeline(
      fs.createReadStream(inputFile),
      new CSVParser(),
      new JSONStringifier(),
      fs.createWriteStream(outputFile),
      (error) => {
        if (error) {
          reject(error);
        } else {
          resolve();
        }
      }
    );
  });
}

内存管理

理解内存使用

javascript
// memory-management.js

function analyzeMemoryUsage() {
  const usage = process.memoryUsage();
  
  console.log('Memory Usage:');
  console.log(`RSS: ${formatBytes(usage.rss)} - Resident Set Size`);
  console.log(`Heap Total: ${formatBytes(usage.heapTotal)} - Total heap allocated`);
  console.log(`Heap Used: ${formatBytes(usage.heapUsed)} - Heap actually used`);
  console.log(`External: ${formatBytes(usage.external)} - C++ objects bound to JS`);
  console.log(`Array Buffers: ${formatBytes(usage.arrayBuffers)} - ArrayBuffer and SharedArrayBuffer`);
  
  return usage;
}

function formatBytes(bytes) {
  const mb = bytes / 1024 / 1024;
  return `${mb.toFixed(2)} MB`;
}

// Memory leak example (what to avoid)
class MemoryLeakExample {
  constructor() {
    this.data = [];
    this.interval = setInterval(() => {
      // This creates a memory leak
      this.data.push(new Array(1000000).fill('data'));
    }, 1000);
  }

  // Proper cleanup
  cleanup() {
    clearInterval(this.interval);
    this.data = null;
  }
}

// Proper memory management
class DataProcessor {
  constructor() {
    this.cache = new Map();
    this.maxCacheSize = 1000;
  }

  process(data) {
    // Process data
    const result = data.toUpperCase();
    
    // Cache with size limit
    if (this.cache.size >= this.maxCacheSize) {
      const firstKey = this.cache.keys().next().value;
      this.cache.delete(firstKey);
    }
    
    this.cache.set(data, result);
    return result;
  }

  clearCache() {
    this.cache.clear();
  }
}

// Monitor memory usage
function monitorMemory(interval = 5000) {
  const monitor = setInterval(() => {
    analyzeMemoryUsage();
    console.log('---');
  }, interval);

  // Cleanup function
  return () => clearInterval(monitor);
}

// Usage
console.log('Initial memory usage:');
analyzeMemoryUsage();

const processor = new DataProcessor();
const stopMonitoring = monitorMemory();

// Simulate work
setTimeout(() => {
  console.log('Stopping memory monitor...');
  stopMonitoring();
}, 15000);

错误处理策略

全面的错误处理

javascript
// error-handling-strategies.js
const EventEmitter = require('events');

// Custom error classes
class ValidationError extends Error {
  constructor(message, field) {
    super(message);
    this.name = 'ValidationError';
    this.field = field;
  }
}

class DatabaseError extends Error {
  constructor(message, code) {
    super(message);
    this.name = 'DatabaseError';
    this.code = code;
  }
}

// Error handling service
class ErrorHandler extends EventEmitter {
  constructor() {
    super();
    this.setupGlobalHandlers();
  }

  setupGlobalHandlers() {
    // Handle uncaught exceptions
    process.on('uncaughtException', (error) => {
      console.error('Uncaught Exception:', error);
      this.emit('criticalError', error);
      process.exit(1);
    });

    // Handle unhandled promise rejections
    process.on('unhandledRejection', (reason, promise) => {
      console.error('Unhandled Rejection at:', promise, 'reason:', reason);
      this.emit('unhandledRejection', { reason, promise });
    });

    // Handle warnings
    process.on('warning', (warning) => {
      console.warn('Warning:', warning.message);
      this.emit('warning', warning);
    });
  }

  handleError(error, context = {}) {
    const errorInfo = {
      message: error.message,
      stack: error.stack,
      name: error.name,
      timestamp: new Date().toISOString(),
      context
    };

    // Log error based on type
    if (error instanceof ValidationError) {
      console.warn('Validation Error:', errorInfo);
      this.emit('validationError', errorInfo);
    } else if (error instanceof DatabaseError) {
      console.error('Database Error:', errorInfo);
      this.emit('databaseError', errorInfo);
    } else {
      console.error('General Error:', errorInfo);
      this.emit('error', errorInfo);
    }

    return errorInfo;
  }

  // Async error wrapper
  asyncWrapper(fn) {
    return async (...args) => {
      try {
        return await fn(...args);
      } catch (error) {
        this.handleError(error, { function: fn.name, args });
        throw error;
      }
    };
  }
}

// Usage example
const errorHandler = new ErrorHandler();

// Listen for different error types
errorHandler.on('validationError', (errorInfo) => {
  // Send to monitoring service
  console.log('Sending validation error to monitoring...');
});

errorHandler.on('databaseError', (errorInfo) => {
  // Alert database team
  console.log('Alerting database team...');
});

// Example functions with error handling
async function validateUser(userData) {
  if (!userData.email) {
    throw new ValidationError('Email is required', 'email');
  }
  
  if (!userData.email.includes('@')) {
    throw new ValidationError('Invalid email format', 'email');
  }
  
  return true;
}

async function saveUser(userData) {
  // Simulate database error
  if (userData.email === 'error@test.com') {
    throw new DatabaseError('Connection timeout', 'TIMEOUT');
  }
  
  return { id: Date.now(), ...userData };
}

// Wrapped functions
const safeValidateUser = errorHandler.asyncWrapper(validateUser);
const safeSaveUser = errorHandler.asyncWrapper(saveUser);

// Test error handling
async function testErrorHandling() {
  try {
    await safeValidateUser({ email: 'invalid-email' });
  } catch (error) {
    console.log('Caught validation error');
  }

  try {
    await safeSaveUser({ email: 'error@test.com' });
  } catch (error) {
    console.log('Caught database error');
  }
}

testErrorHandling();

下一步

在下一章中,我们将探索如何有效地使用组件和模块组织 Node.js 应用程序。

实践练习

  1. 为实时通知系统创建自定义 EventEmitter
  2. 构建基于流的文件处理管道
  3. 使用流实现内存高效的数据处理器
  4. 为 Web 应用程序创建全面的错误处理系统

关键要点

  • 事件驱动编程是 Node.js 架构的基础
  • EventEmitter 为许多 Node.js 模式提供基础
  • Async/await 是处理异步操作的首选方式
  • 流支持高效处理大型数据集
  • 适当的错误处理防止应用程序崩溃
  • 内存管理对于长时间运行的应用程序至关重要
  • 理解事件循环有助于优化性能

本站内容仅供学习和研究使用。