Skip to content

状态管理

概述

Node.js 应用程序中的状态管理涉及处理跨请求持久化的数据、管理用户会话、缓存频繁访问的数据以及协调应用程序不同部分之间的状态。本章介绍各种状态管理策略和实现。

会话管理

使用内存存储的 Express 会话

javascript
// session-basic.js
const express = require('express');
const session = require('express-session');

const app = express();

// Basic session configuration
app.use(session({
  secret: 'your-secret-key',
  resave: false,
  saveUninitialized: false,
  cookie: {
    secure: false, // Set to true in production with HTTPS
    httpOnly: true,
    maxAge: 24 * 60 * 60 * 1000 // 24 hours
  }
}));

app.use(express.json());

// Login route
app.post('/login', (req, res) => {
  const { username, password } = req.body;
  
  // Simple authentication (use proper authentication in production)
  if (username === 'admin' && password === 'password') {
    req.session.user = {
      id: 1,
      username: 'admin',
      role: 'administrator'
    };
    
    req.session.loginTime = new Date();
    
    res.json({
      message: 'Login successful',
      user: req.session.user
    });
  } else {
    res.status(401).json({ error: 'Invalid credentials' });
  }
});

// Protected route
app.get('/profile', (req, res) => {
  if (!req.session.user) {
    return res.status(401).json({ error: 'Not authenticated' });
  }
  
  res.json({
    user: req.session.user,
    loginTime: req.session.loginTime,
    sessionId: req.sessionID
  });
});

// Update session data
app.post('/preferences', (req, res) => {
  if (!req.session.user) {
    return res.status(401).json({ error: 'Not authenticated' });
  }
  
  req.session.preferences = req.body;
  
  res.json({
    message: 'Preferences updated',
    preferences: req.session.preferences
  });
});

// Logout route
app.post('/logout', (req, res) => {
  req.session.destroy((err) => {
    if (err) {
      return res.status(500).json({ error: 'Could not log out' });
    }
    
    res.clearCookie('connect.sid'); // Default session cookie name
    res.json({ message: 'Logout successful' });
  });
});

app.listen(3000, () => {
  console.log('Server running on http://localhost:3000');
});

使用 Redis 存储的会话

javascript
// session-redis.js
const express = require('express');
const session = require('express-session');
const RedisStore = require('connect-redis')(session);
const redis = require('redis');

const app = express();

// Create Redis client
const redisClient = redis.createClient({
  host: 'localhost',
  port: 6379,
  // password: 'your-redis-password'
});

redisClient.on('error', (err) => {
  console.error('Redis error:', err);
});

redisClient.on('connect', () => {
  console.log('Connected to Redis');
});

// Session configuration with Redis store
app.use(session({
  store: new RedisStore({ client: redisClient }),
  secret: 'your-secret-key',
  resave: false,
  saveUninitialized: false,
  cookie: {
    secure: false,
    httpOnly: true,
    maxAge: 24 * 60 * 60 * 1000 // 24 hours
  }
}));

app.use(express.json());

// Session middleware to track user activity
app.use((req, res, next) => {
  if (req.session.user) {
    req.session.lastActivity = new Date();
  }
  next();
});

// Enhanced login with session tracking
app.post('/login', async (req, res) => {
  const { username, password } = req.body;
  
  if (username === 'admin' && password === 'password') {
    req.session.user = {
      id: 1,
      username: 'admin',
      role: 'administrator'
    };
    
    req.session.loginTime = new Date();
    req.session.loginCount = (req.session.loginCount || 0) + 1;
    
    // Store additional session data in Redis
    await redisClient.setex(
      `user:${req.session.user.id}:active_session`,
      3600, // 1 hour TTL
      req.sessionID
    );
    
    res.json({
      message: 'Login successful',
      user: req.session.user,
      loginCount: req.session.loginCount
    });
  } else {
    res.status(401).json({ error: 'Invalid credentials' });
  }
});

// Get active sessions for a user
app.get('/sessions', async (req, res) => {
  if (!req.session.user) {
    return res.status(401).json({ error: 'Not authenticated' });
  }
  
  try {
    const activeSession = await redisClient.get(`user:${req.session.user.id}:active_session`);
    
    res.json({
      currentSession: req.sessionID,
      activeSession,
      isCurrentActive: req.sessionID === activeSession
    });
  } catch (error) {
    res.status(500).json({ error: 'Failed to get session info' });
  }
});

app.listen(3000);

内存状态管理

应用程序状态管理器

javascript
// state-manager.js
const EventEmitter = require('events');

class StateManager extends EventEmitter {
  constructor() {
    super();
    this.state = new Map();
    this.subscribers = new Map();
    this.middleware = [];
  }

  // Add middleware for state changes
  use(middleware) {
    this.middleware.push(middleware);
  }

  // Get state value
  get(key) {
    return this.state.get(key);
  }

  // Set state value with middleware and events
  async set(key, value, context = {}) {
    const oldValue = this.state.get(key);
    
    // Run middleware
    let newValue = value;
    for (const middleware of this.middleware) {
      newValue = await middleware(key, newValue, oldValue, context);
    }
    
    this.state.set(key, newValue);
    
    // Emit change event
    this.emit('change', {
      key,
      oldValue,
      newValue,
      context
    });
    
    // Emit specific key change event
    this.emit(`change:${key}`, {
      oldValue,
      newValue,
      context
    });
    
    return newValue;
  }

  // Update state with a function
  async update(key, updater, context = {}) {
    const currentValue = this.get(key);
    const newValue = updater(currentValue);
    return this.set(key, newValue, context);
  }

  // Delete state
  delete(key) {
    const oldValue = this.state.get(key);
    const deleted = this.state.delete(key);
    
    if (deleted) {
      this.emit('delete', { key, oldValue });
      this.emit(`delete:${key}`, { oldValue });
    }
    
    return deleted;
  }

  // Check if key exists
  has(key) {
    return this.state.has(key);
  }

  // Get all keys
  keys() {
    return Array.from(this.state.keys());
  }

  // Get all values
  values() {
    return Array.from(this.state.values());
  }

  // Get state size
  size() {
    return this.state.size;
  }

  // Clear all state
  clear() {
    const oldState = new Map(this.state);
    this.state.clear();
    this.emit('clear', { oldState });
  }

  // Subscribe to state changes
  subscribe(key, callback) {
    const eventName = `change:${key}`;
    this.on(eventName, callback);
    
    // Return unsubscribe function
    return () => this.off(eventName, callback);
  }

  // Get snapshot of current state
  getSnapshot() {
    return Object.fromEntries(this.state);
  }

  // Restore state from snapshot
  restoreSnapshot(snapshot) {
    this.clear();
    for (const [key, value] of Object.entries(snapshot)) {
      this.state.set(key, value);
    }
    this.emit('restore', { snapshot });
  }
}

// Usage example
const stateManager = new StateManager();

// Add logging middleware
stateManager.use(async (key, newValue, oldValue, context) => {
  console.log(`State change: ${key}`, {
    from: oldValue,
    to: newValue,
    context
  });
  return newValue;
});

// Add validation middleware
stateManager.use(async (key, newValue, oldValue, context) => {
  if (key === 'userCount' && typeof newValue !== 'number') {
    throw new Error('userCount must be a number');
  }
  return newValue;
});

// Subscribe to changes
const unsubscribe = stateManager.subscribe('userCount', ({ oldValue, newValue }) => {
  console.log(`User count changed from ${oldValue} to ${newValue}`);
});

// Set initial state
stateManager.set('userCount', 0);
stateManager.set('appName', 'My Node.js App');

// Update state
stateManager.update('userCount', count => count + 1);

module.exports = StateManager;

用户会话存储

javascript
// user-session-store.js
class UserSessionStore {
  constructor() {
    this.sessions = new Map();
    this.userSessions = new Map(); // userId -> Set of sessionIds
    this.sessionTimeout = 30 * 60 * 1000; // 30 minutes
    this.cleanupInterval = 5 * 60 * 1000; // 5 minutes
    
    this.startCleanup();
  }

  createSession(userId, sessionData = {}) {
    const sessionId = this.generateSessionId();
    const session = {
      id: sessionId,
      userId,
      data: sessionData,
      createdAt: new Date(),
      lastActivity: new Date(),
      isActive: true
    };

    this.sessions.set(sessionId, session);
    
    // Track user sessions
    if (!this.userSessions.has(userId)) {
      this.userSessions.set(userId, new Set());
    }
    this.userSessions.get(userId).add(sessionId);

    return session;
  }

  getSession(sessionId) {
    const session = this.sessions.get(sessionId);
    
    if (session && this.isSessionValid(session)) {
      session.lastActivity = new Date();
      return session;
    }
    
    return null;
  }

  updateSession(sessionId, data) {
    const session = this.sessions.get(sessionId);
    
    if (session && this.isSessionValid(session)) {
      session.data = { ...session.data, ...data };
      session.lastActivity = new Date();
      return session;
    }
    
    return null;
  }

  destroySession(sessionId) {
    const session = this.sessions.get(sessionId);
    
    if (session) {
      // Remove from user sessions
      const userSessions = this.userSessions.get(session.userId);
      if (userSessions) {
        userSessions.delete(sessionId);
        if (userSessions.size === 0) {
          this.userSessions.delete(session.userId);
        }
      }
      
      return this.sessions.delete(sessionId);
    }
    
    return false;
  }

  getUserSessions(userId) {
    const sessionIds = this.userSessions.get(userId);
    
    if (!sessionIds) {
      return [];
    }
    
    return Array.from(sessionIds)
      .map(id => this.sessions.get(id))
      .filter(session => session && this.isSessionValid(session));
  }

  destroyUserSessions(userId) {
    const sessionIds = this.userSessions.get(userId);
    
    if (sessionIds) {
      for (const sessionId of sessionIds) {
        this.sessions.delete(sessionId);
      }
      this.userSessions.delete(userId);
      return sessionIds.size;
    }
    
    return 0;
  }

  isSessionValid(session) {
    if (!session.isActive) {
      return false;
    }
    
    const now = new Date();
    const timeSinceActivity = now - session.lastActivity;
    
    return timeSinceActivity < this.sessionTimeout;
  }

  generateSessionId() {
    return require('crypto').randomBytes(32).toString('hex');
  }

  startCleanup() {
    setInterval(() => {
      this.cleanupExpiredSessions();
    }, this.cleanupInterval);
  }

  cleanupExpiredSessions() {
    const now = new Date();
    let cleanedCount = 0;
    
    for (const [sessionId, session] of this.sessions) {
      if (!this.isSessionValid(session)) {
        this.destroySession(sessionId);
        cleanedCount++;
      }
    }
    
    if (cleanedCount > 0) {
      console.log(`Cleaned up ${cleanedCount} expired sessions`);
    }
  }

  getStats() {
    return {
      totalSessions: this.sessions.size,
      activeUsers: this.userSessions.size,
      averageSessionsPerUser: this.sessions.size / Math.max(this.userSessions.size, 1)
    };
  }
}

module.exports = UserSessionStore;

缓存策略

多级缓存

javascript
// cache-manager.js
const NodeCache = require('node-cache');
const redis = require('redis');

class CacheManager {
  constructor(options = {}) {
    // L1 Cache - In-memory (fastest)
    this.l1Cache = new NodeCache({
      stdTTL: options.l1TTL || 300, // 5 minutes
      checkperiod: options.l1CheckPeriod || 60 // 1 minute
    });

    // L2 Cache - Redis (shared across instances)
    this.l2Cache = redis.createClient(options.redis || {});
    this.l2TTL = options.l2TTL || 3600; // 1 hour

    this.stats = {
      l1Hits: 0,
      l2Hits: 0,
      misses: 0,
      sets: 0
    };

    this.setupEventHandlers();
  }

  setupEventHandlers() {
    this.l1Cache.on('set', (key, value) => {
      console.log(`L1 Cache SET: ${key}`);
    });

    this.l1Cache.on('del', (key, value) => {
      console.log(`L1 Cache DEL: ${key}`);
    });

    this.l1Cache.on('expired', (key, value) => {
      console.log(`L1 Cache EXPIRED: ${key}`);
    });
  }

  async get(key) {
    try {
      // Try L1 cache first
      const l1Value = this.l1Cache.get(key);
      if (l1Value !== undefined) {
        this.stats.l1Hits++;
        return l1Value;
      }

      // Try L2 cache
      const l2Value = await this.l2Cache.get(key);
      if (l2Value !== null) {
        this.stats.l2Hits++;
        
        // Promote to L1 cache
        const parsedValue = JSON.parse(l2Value);
        this.l1Cache.set(key, parsedValue);
        
        return parsedValue;
      }

      // Cache miss
      this.stats.misses++;
      return null;
    } catch (error) {
      console.error('Cache get error:', error);
      return null;
    }
  }

  async set(key, value, ttl = null) {
    try {
      this.stats.sets++;
      
      // Set in L1 cache
      this.l1Cache.set(key, value, ttl || this.l1Cache.options.stdTTL);
      
      // Set in L2 cache
      const serializedValue = JSON.stringify(value);
      await this.l2Cache.setex(key, ttl || this.l2TTL, serializedValue);
      
      return true;
    } catch (error) {
      console.error('Cache set error:', error);
      return false;
    }
  }

  async delete(key) {
    try {
      // Delete from both caches
      this.l1Cache.del(key);
      await this.l2Cache.del(key);
      return true;
    } catch (error) {
      console.error('Cache delete error:', error);
      return false;
    }
  }

  async clear() {
    try {
      this.l1Cache.flushAll();
      await this.l2Cache.flushall();
      return true;
    } catch (error) {
      console.error('Cache clear error:', error);
      return false;
    }
  }

  // Cache-aside pattern
  async getOrSet(key, fetchFunction, ttl = null) {
    const cachedValue = await this.get(key);
    
    if (cachedValue !== null) {
      return cachedValue;
    }

    try {
      const freshValue = await fetchFunction();
      await this.set(key, freshValue, ttl);
      return freshValue;
    } catch (error) {
      console.error('Cache getOrSet error:', error);
      throw error;
    }
  }

  // Write-through pattern
  async setAndPersist(key, value, persistFunction, ttl = null) {
    try {
      // Persist to database first
      await persistFunction(value);
      
      // Then cache
      await this.set(key, value, ttl);
      
      return true;
    } catch (error) {
      console.error('Cache setAndPersist error:', error);
      throw error;
    }
  }

  // Write-behind pattern
  async setWithDelayedPersist(key, value, persistFunction, ttl = null, delay = 5000) {
    try {
      // Cache immediately
      await this.set(key, value, ttl);
      
      // Persist after delay
      setTimeout(async () => {
        try {
          await persistFunction(value);
        } catch (error) {
          console.error('Delayed persist error:', error);
        }
      }, delay);
      
      return true;
    } catch (error) {
      console.error('Cache setWithDelayedPersist error:', error);
      throw error;
    }
  }

  getStats() {
    const total = this.stats.l1Hits + this.stats.l2Hits + this.stats.misses;
    
    return {
      ...this.stats,
      total,
      l1HitRate: total > 0 ? (this.stats.l1Hits / total * 100).toFixed(2) + '%' : '0%',
      l2HitRate: total > 0 ? (this.stats.l2Hits / total * 100).toFixed(2) + '%' : '0%',
      missRate: total > 0 ? (this.stats.misses / total * 100).toFixed(2) + '%' : '0%',
      l1Size: this.l1Cache.getStats().keys,
      l1Memory: this.l1Cache.getStats().vsize
    };
  }

  resetStats() {
    this.stats = {
      l1Hits: 0,
      l2Hits: 0,
      misses: 0,
      sets: 0
    };
  }
}

module.exports = CacheManager;

缓存装饰器

javascript
// cache-decorators.js
const CacheManager = require('./cache-manager');

class CacheDecorators {
  constructor(cacheManager) {
    this.cache = cacheManager;
  }

  // Method decorator for caching
  cached(ttl = 3600, keyGenerator = null) {
    return (target, propertyName, descriptor) => {
      const originalMethod = descriptor.value;

      descriptor.value = async function(...args) {
        const cacheKey = keyGenerator 
          ? keyGenerator(propertyName, args)
          : `${target.constructor.name}:${propertyName}:${JSON.stringify(args)}`;

        // Try to get from cache
        const cachedResult = await this.cache.get(cacheKey);
        if (cachedResult !== null) {
          return cachedResult;
        }

        // Execute original method
        const result = await originalMethod.apply(this, args);
        
        // Cache the result
        await this.cache.set(cacheKey, result, ttl);
        
        return result;
      };

      return descriptor;
    };
  }

  // Cache invalidation decorator
  invalidatesCache(patterns = []) {
    return (target, propertyName, descriptor) => {
      const originalMethod = descriptor.value;

      descriptor.value = async function(...args) {
        const result = await originalMethod.apply(this, args);
        
        // Invalidate cache patterns
        for (const pattern of patterns) {
          const keys = typeof pattern === 'function' 
            ? pattern(args, result)
            : [pattern];
          
          for (const key of keys) {
            await this.cache.delete(key);
          }
        }
        
        return result;
      };

      return descriptor;
    };
  }
}

// Usage example
class UserService {
  constructor(cacheManager) {
    this.cache = cacheManager;
    this.decorators = new CacheDecorators(cacheManager);
  }

  // @cached(3600) // Cache for 1 hour
  async getUserById(userId) {
    // Simulate database call
    console.log(`Fetching user ${userId} from database`);
    return {
      id: userId,
      name: `User ${userId}`,
      email: `user${userId}@example.com`
    };
  }

  // @invalidatesCache(['user:*'])
  async updateUser(userId, userData) {
    // Simulate database update
    console.log(`Updating user ${userId} in database`);
    
    // Invalidate user cache
    await this.cache.delete(`UserService:getUserById:["${userId}"]`);
    
    return { id: userId, ...userData };
  }
}

module.exports = { CacheDecorators, UserService };

状态同步

事件驱动的状态同步

javascript
// state-sync.js
const EventEmitter = require('events');

class StateSynchronizer extends EventEmitter {
  constructor() {
    super();
    this.nodes = new Map();
    this.state = new Map();
    this.version = 0;
  }

  registerNode(nodeId, connection) {
    this.nodes.set(nodeId, {
      id: nodeId,
      connection,
      lastSync: new Date(),
      version: 0
    });

    // Send current state to new node
    this.syncNode(nodeId);
    
    this.emit('nodeRegistered', nodeId);
  }

  unregisterNode(nodeId) {
    this.nodes.delete(nodeId);
    this.emit('nodeUnregistered', nodeId);
  }

  setState(key, value, sourceNode = null) {
    const oldValue = this.state.get(key);
    this.state.set(key, value);
    this.version++;

    const change = {
      key,
      value,
      oldValue,
      version: this.version,
      timestamp: new Date(),
      sourceNode
    };

    // Broadcast to all nodes except source
    this.broadcastChange(change, sourceNode);
    
    this.emit('stateChanged', change);
  }

  getState(key) {
    return this.state.get(key);
  }

  getAllState() {
    return Object.fromEntries(this.state);
  }

  broadcastChange(change, excludeNode = null) {
    for (const [nodeId, node] of this.nodes) {
      if (nodeId !== excludeNode) {
        try {
          node.connection.send(JSON.stringify({
            type: 'stateChange',
            data: change
          }));
        } catch (error) {
          console.error(`Failed to send to node ${nodeId}:`, error);
          this.unregisterNode(nodeId);
        }
      }
    }
  }

  syncNode(nodeId) {
    const node = this.nodes.get(nodeId);
    if (!node) return;

    const syncData = {
      type: 'fullSync',
      data: {
        state: this.getAllState(),
        version: this.version
      }
    };

    try {
      node.connection.send(JSON.stringify(syncData));
      node.lastSync = new Date();
      node.version = this.version;
    } catch (error) {
      console.error(`Failed to sync node ${nodeId}:`, error);
      this.unregisterNode(nodeId);
    }
  }

  handleMessage(nodeId, message) {
    try {
      const { type, data } = JSON.parse(message);

      switch (type) {
        case 'stateChange':
          this.handleStateChange(nodeId, data);
          break;
        case 'syncRequest':
          this.syncNode(nodeId);
          break;
        case 'heartbeat':
          this.handleHeartbeat(nodeId);
          break;
      }
    } catch (error) {
      console.error(`Invalid message from node ${nodeId}:`, error);
    }
  }

  handleStateChange(nodeId, change) {
    if (change.version > this.version) {
      this.state.set(change.key, change.value);
      this.version = change.version;
      
      // Broadcast to other nodes
      this.broadcastChange(change, nodeId);
      
      this.emit('stateChanged', { ...change, sourceNode: nodeId });
    }
  }

  handleHeartbeat(nodeId) {
    const node = this.nodes.get(nodeId);
    if (node) {
      node.lastSync = new Date();
    }
  }

  startHeartbeat(interval = 30000) {
    setInterval(() => {
      const now = new Date();
      
      for (const [nodeId, node] of this.nodes) {
        const timeSinceSync = now - node.lastSync;
        
        if (timeSinceSync > interval * 2) {
          console.log(`Node ${nodeId} appears to be disconnected`);
          this.unregisterNode(nodeId);
        }
      }
    }, interval);
  }

  getNodeStats() {
    return {
      totalNodes: this.nodes.size,
      stateSize: this.state.size,
      version: this.version,
      nodes: Array.from(this.nodes.entries()).map(([id, node]) => ({
        id,
        lastSync: node.lastSync,
        version: node.version
      }))
    };
  }
}

module.exports = StateSynchronizer;

下一步

在下一章中,我们将探索 Node.js 中的函数和方法,包括高级函数模式、闭包和函数式编程概念。

实践练习

  1. 使用 Redis 实现分布式会话存储
  2. 创建带有自动缓存预热的多级缓存系统
  3. 使用 WebSocket 构建实时状态同步系统
  4. 实现带有自动失效的缓存旁路模式

关键要点

  • 会话管理在无状态 HTTP 中实现有状态交互
  • 内存状态管理提供快速访问应用程序数据
  • 多级缓存提高性能并减少数据库负载
  • 状态同步启用分布式应用程序架构
  • 正确的缓存失效策略防止过期数据问题
  • 事件驱动的状态管理启用响应式应用程序
  • 监控和统计有助于优化状态管理性能

本站内容仅供学习和研究使用。