核心概念
概述
本章介绍使 Node.js 独特而强大的基本概念:事件驱动编程、事件发射器模式、回调、Promise、async/await 以及 Node.js 事件循环的详细内容。
事件驱动编程
理解事件
Node.js 是围绕事件构建的。Node.js 中的许多对象都会发出事件,你可以监听这些事件并做出相应的响应:
javascript
// basic-events.js
const EventEmitter = require('events');
// Create an event emitter
const myEmitter = new EventEmitter();
// Listen for events
myEmitter.on('message', (data) => {
console.log('Received message:', data);
});
myEmitter.on('error', (error) => {
console.error('Error occurred:', error.message);
});
// Emit events
myEmitter.emit('message', 'Hello, World!');
myEmitter.emit('message', { user: 'John', text: 'How are you?' });
myEmitter.emit('error', new Error('Something went wrong'));自定义事件发射器
创建继承 EventEmitter 的自定义类:
javascript
// custom-emitter.js
const EventEmitter = require('events');
class ChatRoom extends EventEmitter {
constructor(name) {
super();
this.name = name;
this.users = new Set();
this.messages = [];
}
addUser(username) {
if (this.users.has(username)) {
this.emit('error', new Error('User already exists'));
return false;
}
this.users.add(username);
this.emit('userJoined', { username, room: this.name });
return true;
}
removeUser(username) {
if (!this.users.has(username)) {
this.emit('error', new Error('User not found'));
return false;
}
this.users.delete(username);
this.emit('userLeft', { username, room: this.name });
return true;
}
sendMessage(username, message) {
if (!this.users.has(username)) {
this.emit('error', new Error('User not in room'));
return false;
}
const messageObj = {
id: Date.now(),
username,
message,
timestamp: new Date().toISOString()
};
this.messages.push(messageObj);
this.emit('message', messageObj);
return true;
}
getUsers() {
return Array.from(this.users);
}
getMessages() {
return [...this.messages];
}
}
// Usage
const chatRoom = new ChatRoom('General');
// Listen for events
chatRoom.on('userJoined', (data) => {
console.log(`${data.username} joined ${data.room}`);
});
chatRoom.on('userLeft', (data) => {
console.log(`${data.username} left ${data.room}`);
});
chatRoom.on('message', (data) => {
console.log(`[${data.timestamp}] ${data.username}: ${data.message}`);
});
chatRoom.on('error', (error) => {
console.error('Chat room error:', error.message);
});
// Use the chat room
chatRoom.addUser('Alice');
chatRoom.addUser('Bob');
chatRoom.sendMessage('Alice', 'Hello everyone!');
chatRoom.sendMessage('Bob', 'Hi Alice!');
chatRoom.removeUser('Alice');事件发射器模式
javascript
// event-patterns.js
const EventEmitter = require('events');
class DataProcessor extends EventEmitter {
constructor() {
super();
this.processing = false;
this.queue = [];
}
// Once - listen for event only once
processOnce(data) {
this.once('processed', (result) => {
console.log('One-time processing result:', result);
});
this.process(data);
}
// Multiple listeners for same event
process(data) {
if (this.processing) {
this.queue.push(data);
this.emit('queued', { data, queueLength: this.queue.length });
return;
}
this.processing = true;
this.emit('started', data);
// Simulate processing
setTimeout(() => {
const result = data.toUpperCase();
this.processing = false;
this.emit('processed', result);
this.emit('completed', { input: data, output: result });
// Process next item in queue
if (this.queue.length > 0) {
const nextData = this.queue.shift();
this.process(nextData);
}
}, 1000);
}
// Remove listeners
removeAllProcessedListeners() {
this.removeAllListeners('processed');
}
// Get listener count
getListenerCount(event) {
return this.listenerCount(event);
}
}
const processor = new DataProcessor();
// Multiple listeners for the same event
processor.on('processed', (result) => {
console.log('Listener 1 - Processed:', result);
});
processor.on('processed', (result) => {
console.log('Listener 2 - Saving to database:', result);
});
processor.on('started', (data) => {
console.log('Processing started for:', data);
});
processor.on('queued', (info) => {
console.log(`Item queued. Queue length: ${info.queueLength}`);
});
// Process multiple items
processor.process('hello');
processor.process('world');
processor.process('nodejs');
console.log('Processed listeners:', processor.getListenerCount('processed'));异步编程模式
回调
处理异步操作的传统 Node.js 模式:
javascript
// callbacks.js
const fs = require('fs');
// Error-first callback pattern
function readFileCallback(filename, callback) {
fs.readFile(filename, 'utf8', (error, data) => {
if (error) {
return callback(error, null);
}
callback(null, data);
});
}
// Callback hell example (what to avoid)
function processFiles() {
readFileCallback('file1.txt', (error1, data1) => {
if (error1) {
console.error('Error reading file1:', error1.message);
return;
}
readFileCallback('file2.txt', (error2, data2) => {
if (error2) {
console.error('Error reading file2:', error2.message);
return;
}
readFileCallback('file3.txt', (error3, data3) => {
if (error3) {
console.error('Error reading file3:', error3.message);
return;
}
console.log('All files read successfully');
console.log('Combined length:', data1.length + data2.length + data3.length);
});
});
});
}
// Better callback pattern with error handling
function readMultipleFiles(filenames, callback) {
const results = [];
let completed = 0;
let hasError = false;
filenames.forEach((filename, index) => {
readFileCallback(filename, (error, data) => {
if (hasError) return;
if (error) {
hasError = true;
return callback(error, null);
}
results[index] = data;
completed++;
if (completed === filenames.length) {
callback(null, results);
}
});
});
}
// Usage
readMultipleFiles(['package.json'], (error, results) => {
if (error) {
console.error('Error:', error.message);
return;
}
console.log('Files read successfully:', results.length);
});Promise
Promise 提供了一种更清晰的处理异步操作的方式:
javascript
// promises.js
const fs = require('fs').promises;
// Basic promise usage
function readFilePromise(filename) {
return fs.readFile(filename, 'utf8');
}
// Promise chaining
readFilePromise('package.json')
.then(data => {
console.log('File read successfully');
return JSON.parse(data);
})
.then(packageInfo => {
console.log('Package name:', packageInfo.name);
console.log('Package version:', packageInfo.version);
return packageInfo;
})
.catch(error => {
console.error('Error:', error.message);
});
// Creating custom promises
function delay(ms) {
return new Promise(resolve => {
setTimeout(resolve, ms);
});
}
function fetchUserData(userId) {
return new Promise((resolve, reject) => {
// Simulate API call
setTimeout(() => {
if (userId > 0) {
resolve({
id: userId,
name: `User ${userId}`,
email: `user${userId}@example.com`
});
} else {
reject(new Error('Invalid user ID'));
}
}, 1000);
});
}
// Promise.all - wait for all promises to complete
Promise.all([
fetchUserData(1),
fetchUserData(2),
fetchUserData(3)
])
.then(users => {
console.log('All users fetched:', users.length);
users.forEach(user => console.log(`- ${user.name}`));
})
.catch(error => {
console.error('Error fetching users:', error.message);
});
// Promise.allSettled - wait for all promises to settle (resolve or reject)
Promise.allSettled([
fetchUserData(1),
fetchUserData(-1), // This will reject
fetchUserData(2)
])
.then(results => {
console.log('All promises settled:');
results.forEach((result, index) => {
if (result.status === 'fulfilled') {
console.log(`User ${index + 1}:`, result.value.name);
} else {
console.log(`User ${index + 1} failed:`, result.reason.message);
}
});
});
// Promise.race - resolve with the first promise that completes
Promise.race([
fetchUserData(1),
delay(500).then(() => 'Timeout')
])
.then(result => {
console.log('Race result:', result);
});Async/Await
处理异步操作的现代方式:
javascript
// async-await.js
const fs = require('fs').promises;
// Basic async/await
async function readPackageInfo() {
try {
const data = await fs.readFile('package.json', 'utf8');
const packageInfo = JSON.parse(data);
return packageInfo;
} catch (error) {
console.error('Error reading package.json:', error.message);
throw error;
}
}
// Sequential operations
async function processFilesSequentially() {
try {
console.log('Starting sequential processing...');
const file1 = await fs.readFile('package.json', 'utf8');
console.log('File 1 read, length:', file1.length);
await delay(1000); // Wait 1 second
const packageInfo = JSON.parse(file1);
console.log('Package parsed:', packageInfo.name);
return packageInfo;
} catch (error) {
console.error('Sequential processing error:', error.message);
throw error;
}
}
// Parallel operations
async function processFilesParallel() {
try {
console.log('Starting parallel processing...');
const [file1, file2] = await Promise.all([
fs.readFile('package.json', 'utf8'),
delay(1000).then(() => 'delayed result')
]);
console.log('Both operations completed');
console.log('File length:', file1.length);
console.log('Delayed result:', file2);
return { file1, file2 };
} catch (error) {
console.error('Parallel processing error:', error.message);
throw error;
}
}
// Error handling with async/await
async function robustFileProcessor() {
const filenames = ['package.json', 'nonexistent.txt', 'README.md'];
const results = [];
for (const filename of filenames) {
try {
const data = await fs.readFile(filename, 'utf8');
results.push({ filename, success: true, size: data.length });
} catch (error) {
results.push({ filename, success: false, error: error.message });
}
}
return results;
}
// Using async/await with loops
async function processItemsSequentially(items) {
const results = [];
for (const item of items) {
try {
const result = await processItem(item);
results.push(result);
} catch (error) {
console.error(`Error processing ${item}:`, error.message);
}
}
return results;
}
async function processItemsConcurrently(items) {
const promises = items.map(item => processItem(item));
try {
const results = await Promise.all(promises);
return results;
} catch (error) {
console.error('Error in concurrent processing:', error.message);
throw error;
}
}
async function processItem(item) {
await delay(Math.random() * 1000);
return `Processed: ${item}`;
}
// Helper function
function delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
// Usage examples
async function main() {
try {
// Sequential processing
await processFilesSequentially();
// Parallel processing
await processFilesParallel();
// Robust error handling
const results = await robustFileProcessor();
console.log('File processing results:', results);
// Process items
const items = ['item1', 'item2', 'item3'];
const sequentialResults = await processItemsSequentially(items);
console.log('Sequential results:', sequentialResults);
} catch (error) {
console.error('Main function error:', error.message);
}
}
main();深入理解流
理解流
流是处理数据的强大抽象:
javascript
// streams-advanced.js
const { Readable, Writable, Transform, pipeline } = require('stream');
const fs = require('fs');
// Custom Readable Stream
class NumberGenerator extends Readable {
constructor(options) {
super(options);
this.current = 1;
this.max = options.max || 10;
}
_read() {
if (this.current <= this.max) {
this.push(`${this.current}\n`);
this.current++;
} else {
this.push(null); // End of stream
}
}
}
// Custom Transform Stream
class NumberSquarer extends Transform {
_transform(chunk, encoding, callback) {
const number = parseInt(chunk.toString().trim());
const squared = number * number;
callback(null, `${number}^2 = ${squared}\n`);
}
}
// Custom Writable Stream
class Logger extends Writable {
constructor(options) {
super(options);
this.logFile = fs.createWriteStream('numbers.log', { flags: 'a' });
}
_write(chunk, encoding, callback) {
const timestamp = new Date().toISOString();
const logEntry = `[${timestamp}] ${chunk}`;
console.log('Logging:', chunk.toString().trim());
this.logFile.write(logEntry, callback);
}
_final(callback) {
this.logFile.end(callback);
}
}
// Using streams with pipeline
const numberGen = new NumberGenerator({ max: 5 });
const squarer = new NumberSquarer();
const logger = new Logger();
pipeline(
numberGen,
squarer,
logger,
(error) => {
if (error) {
console.error('Pipeline error:', error);
} else {
console.log('Pipeline completed successfully');
}
}
);流工具
javascript
// stream-utilities.js
const { pipeline, Transform } = require('stream');
const fs = require('fs');
// CSV Parser Transform Stream
class CSVParser extends Transform {
constructor(options = {}) {
super({ objectMode: true });
this.headers = null;
this.delimiter = options.delimiter || ',';
}
_transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n');
for (const line of lines) {
if (!line.trim()) continue;
const values = line.split(this.delimiter);
if (!this.headers) {
this.headers = values;
continue;
}
const obj = {};
this.headers.forEach((header, index) => {
obj[header.trim()] = values[index]?.trim() || '';
});
this.push(obj);
}
callback();
}
}
// JSON Stringifier Transform Stream
class JSONStringifier extends Transform {
constructor() {
super({ objectMode: true });
this.first = true;
}
_transform(chunk, encoding, callback) {
if (this.first) {
this.push('[\n');
this.first = false;
} else {
this.push(',\n');
}
this.push(JSON.stringify(chunk, null, 2));
callback();
}
_flush(callback) {
this.push('\n]');
callback();
}
}
// Usage example
function convertCSVToJSON(inputFile, outputFile) {
return new Promise((resolve, reject) => {
pipeline(
fs.createReadStream(inputFile),
new CSVParser(),
new JSONStringifier(),
fs.createWriteStream(outputFile),
(error) => {
if (error) {
reject(error);
} else {
resolve();
}
}
);
});
}内存管理
理解内存使用
javascript
// memory-management.js
function analyzeMemoryUsage() {
const usage = process.memoryUsage();
console.log('Memory Usage:');
console.log(`RSS: ${formatBytes(usage.rss)} - Resident Set Size`);
console.log(`Heap Total: ${formatBytes(usage.heapTotal)} - Total heap allocated`);
console.log(`Heap Used: ${formatBytes(usage.heapUsed)} - Heap actually used`);
console.log(`External: ${formatBytes(usage.external)} - C++ objects bound to JS`);
console.log(`Array Buffers: ${formatBytes(usage.arrayBuffers)} - ArrayBuffer and SharedArrayBuffer`);
return usage;
}
function formatBytes(bytes) {
const mb = bytes / 1024 / 1024;
return `${mb.toFixed(2)} MB`;
}
// Memory leak example (what to avoid)
class MemoryLeakExample {
constructor() {
this.data = [];
this.interval = setInterval(() => {
// This creates a memory leak
this.data.push(new Array(1000000).fill('data'));
}, 1000);
}
// Proper cleanup
cleanup() {
clearInterval(this.interval);
this.data = null;
}
}
// Proper memory management
class DataProcessor {
constructor() {
this.cache = new Map();
this.maxCacheSize = 1000;
}
process(data) {
// Process data
const result = data.toUpperCase();
// Cache with size limit
if (this.cache.size >= this.maxCacheSize) {
const firstKey = this.cache.keys().next().value;
this.cache.delete(firstKey);
}
this.cache.set(data, result);
return result;
}
clearCache() {
this.cache.clear();
}
}
// Monitor memory usage
function monitorMemory(interval = 5000) {
const monitor = setInterval(() => {
analyzeMemoryUsage();
console.log('---');
}, interval);
// Cleanup function
return () => clearInterval(monitor);
}
// Usage
console.log('Initial memory usage:');
analyzeMemoryUsage();
const processor = new DataProcessor();
const stopMonitoring = monitorMemory();
// Simulate work
setTimeout(() => {
console.log('Stopping memory monitor...');
stopMonitoring();
}, 15000);错误处理策略
全面的错误处理
javascript
// error-handling-strategies.js
const EventEmitter = require('events');
// Custom error classes
class ValidationError extends Error {
constructor(message, field) {
super(message);
this.name = 'ValidationError';
this.field = field;
}
}
class DatabaseError extends Error {
constructor(message, code) {
super(message);
this.name = 'DatabaseError';
this.code = code;
}
}
// Error handling service
class ErrorHandler extends EventEmitter {
constructor() {
super();
this.setupGlobalHandlers();
}
setupGlobalHandlers() {
// Handle uncaught exceptions
process.on('uncaughtException', (error) => {
console.error('Uncaught Exception:', error);
this.emit('criticalError', error);
process.exit(1);
});
// Handle unhandled promise rejections
process.on('unhandledRejection', (reason, promise) => {
console.error('Unhandled Rejection at:', promise, 'reason:', reason);
this.emit('unhandledRejection', { reason, promise });
});
// Handle warnings
process.on('warning', (warning) => {
console.warn('Warning:', warning.message);
this.emit('warning', warning);
});
}
handleError(error, context = {}) {
const errorInfo = {
message: error.message,
stack: error.stack,
name: error.name,
timestamp: new Date().toISOString(),
context
};
// Log error based on type
if (error instanceof ValidationError) {
console.warn('Validation Error:', errorInfo);
this.emit('validationError', errorInfo);
} else if (error instanceof DatabaseError) {
console.error('Database Error:', errorInfo);
this.emit('databaseError', errorInfo);
} else {
console.error('General Error:', errorInfo);
this.emit('error', errorInfo);
}
return errorInfo;
}
// Async error wrapper
asyncWrapper(fn) {
return async (...args) => {
try {
return await fn(...args);
} catch (error) {
this.handleError(error, { function: fn.name, args });
throw error;
}
};
}
}
// Usage example
const errorHandler = new ErrorHandler();
// Listen for different error types
errorHandler.on('validationError', (errorInfo) => {
// Send to monitoring service
console.log('Sending validation error to monitoring...');
});
errorHandler.on('databaseError', (errorInfo) => {
// Alert database team
console.log('Alerting database team...');
});
// Example functions with error handling
async function validateUser(userData) {
if (!userData.email) {
throw new ValidationError('Email is required', 'email');
}
if (!userData.email.includes('@')) {
throw new ValidationError('Invalid email format', 'email');
}
return true;
}
async function saveUser(userData) {
// Simulate database error
if (userData.email === 'error@test.com') {
throw new DatabaseError('Connection timeout', 'TIMEOUT');
}
return { id: Date.now(), ...userData };
}
// Wrapped functions
const safeValidateUser = errorHandler.asyncWrapper(validateUser);
const safeSaveUser = errorHandler.asyncWrapper(saveUser);
// Test error handling
async function testErrorHandling() {
try {
await safeValidateUser({ email: 'invalid-email' });
} catch (error) {
console.log('Caught validation error');
}
try {
await safeSaveUser({ email: 'error@test.com' });
} catch (error) {
console.log('Caught database error');
}
}
testErrorHandling();下一步
在下一章中,我们将探索如何有效地使用组件和模块组织 Node.js 应用程序。
实践练习
- 为实时通知系统创建自定义 EventEmitter
- 构建基于流的文件处理管道
- 使用流实现内存高效的数据处理器
- 为 Web 应用程序创建全面的错误处理系统
关键要点
- 事件驱动编程是 Node.js 架构的基础
- EventEmitter 为许多 Node.js 模式提供基础
- Async/await 是处理异步操作的首选方式
- 流支持高效处理大型数据集
- 适当的错误处理防止应用程序崩溃
- 内存管理对于长时间运行的应用程序至关重要
- 理解事件循环有助于优化性能