状态管理
概述
Node.js 应用程序中的状态管理涉及处理跨请求持久化的数据、管理用户会话、缓存频繁访问的数据以及协调应用程序不同部分之间的状态。本章介绍各种状态管理策略和实现。
会话管理
使用内存存储的 Express 会话
javascript
// session-basic.js
const express = require('express');
const session = require('express-session');
const app = express();
// Basic session configuration
app.use(session({
secret: 'your-secret-key',
resave: false,
saveUninitialized: false,
cookie: {
secure: false, // Set to true in production with HTTPS
httpOnly: true,
maxAge: 24 * 60 * 60 * 1000 // 24 hours
}
}));
app.use(express.json());
// Login route
app.post('/login', (req, res) => {
const { username, password } = req.body;
// Simple authentication (use proper authentication in production)
if (username === 'admin' && password === 'password') {
req.session.user = {
id: 1,
username: 'admin',
role: 'administrator'
};
req.session.loginTime = new Date();
res.json({
message: 'Login successful',
user: req.session.user
});
} else {
res.status(401).json({ error: 'Invalid credentials' });
}
});
// Protected route
app.get('/profile', (req, res) => {
if (!req.session.user) {
return res.status(401).json({ error: 'Not authenticated' });
}
res.json({
user: req.session.user,
loginTime: req.session.loginTime,
sessionId: req.sessionID
});
});
// Update session data
app.post('/preferences', (req, res) => {
if (!req.session.user) {
return res.status(401).json({ error: 'Not authenticated' });
}
req.session.preferences = req.body;
res.json({
message: 'Preferences updated',
preferences: req.session.preferences
});
});
// Logout route
app.post('/logout', (req, res) => {
req.session.destroy((err) => {
if (err) {
return res.status(500).json({ error: 'Could not log out' });
}
res.clearCookie('connect.sid'); // Default session cookie name
res.json({ message: 'Logout successful' });
});
});
app.listen(3000, () => {
console.log('Server running on http://localhost:3000');
});使用 Redis 存储的会话
javascript
// session-redis.js
const express = require('express');
const session = require('express-session');
const RedisStore = require('connect-redis')(session);
const redis = require('redis');
const app = express();
// Create Redis client
const redisClient = redis.createClient({
host: 'localhost',
port: 6379,
// password: 'your-redis-password'
});
redisClient.on('error', (err) => {
console.error('Redis error:', err);
});
redisClient.on('connect', () => {
console.log('Connected to Redis');
});
// Session configuration with Redis store
app.use(session({
store: new RedisStore({ client: redisClient }),
secret: 'your-secret-key',
resave: false,
saveUninitialized: false,
cookie: {
secure: false,
httpOnly: true,
maxAge: 24 * 60 * 60 * 1000 // 24 hours
}
}));
app.use(express.json());
// Session middleware to track user activity
app.use((req, res, next) => {
if (req.session.user) {
req.session.lastActivity = new Date();
}
next();
});
// Enhanced login with session tracking
app.post('/login', async (req, res) => {
const { username, password } = req.body;
if (username === 'admin' && password === 'password') {
req.session.user = {
id: 1,
username: 'admin',
role: 'administrator'
};
req.session.loginTime = new Date();
req.session.loginCount = (req.session.loginCount || 0) + 1;
// Store additional session data in Redis
await redisClient.setex(
`user:${req.session.user.id}:active_session`,
3600, // 1 hour TTL
req.sessionID
);
res.json({
message: 'Login successful',
user: req.session.user,
loginCount: req.session.loginCount
});
} else {
res.status(401).json({ error: 'Invalid credentials' });
}
});
// Get active sessions for a user
app.get('/sessions', async (req, res) => {
if (!req.session.user) {
return res.status(401).json({ error: 'Not authenticated' });
}
try {
const activeSession = await redisClient.get(`user:${req.session.user.id}:active_session`);
res.json({
currentSession: req.sessionID,
activeSession,
isCurrentActive: req.sessionID === activeSession
});
} catch (error) {
res.status(500).json({ error: 'Failed to get session info' });
}
});
app.listen(3000);内存状态管理
应用程序状态管理器
javascript
// state-manager.js
const EventEmitter = require('events');
class StateManager extends EventEmitter {
constructor() {
super();
this.state = new Map();
this.subscribers = new Map();
this.middleware = [];
}
// Add middleware for state changes
use(middleware) {
this.middleware.push(middleware);
}
// Get state value
get(key) {
return this.state.get(key);
}
// Set state value with middleware and events
async set(key, value, context = {}) {
const oldValue = this.state.get(key);
// Run middleware
let newValue = value;
for (const middleware of this.middleware) {
newValue = await middleware(key, newValue, oldValue, context);
}
this.state.set(key, newValue);
// Emit change event
this.emit('change', {
key,
oldValue,
newValue,
context
});
// Emit specific key change event
this.emit(`change:${key}`, {
oldValue,
newValue,
context
});
return newValue;
}
// Update state with a function
async update(key, updater, context = {}) {
const currentValue = this.get(key);
const newValue = updater(currentValue);
return this.set(key, newValue, context);
}
// Delete state
delete(key) {
const oldValue = this.state.get(key);
const deleted = this.state.delete(key);
if (deleted) {
this.emit('delete', { key, oldValue });
this.emit(`delete:${key}`, { oldValue });
}
return deleted;
}
// Check if key exists
has(key) {
return this.state.has(key);
}
// Get all keys
keys() {
return Array.from(this.state.keys());
}
// Get all values
values() {
return Array.from(this.state.values());
}
// Get state size
size() {
return this.state.size;
}
// Clear all state
clear() {
const oldState = new Map(this.state);
this.state.clear();
this.emit('clear', { oldState });
}
// Subscribe to state changes
subscribe(key, callback) {
const eventName = `change:${key}`;
this.on(eventName, callback);
// Return unsubscribe function
return () => this.off(eventName, callback);
}
// Get snapshot of current state
getSnapshot() {
return Object.fromEntries(this.state);
}
// Restore state from snapshot
restoreSnapshot(snapshot) {
this.clear();
for (const [key, value] of Object.entries(snapshot)) {
this.state.set(key, value);
}
this.emit('restore', { snapshot });
}
}
// Usage example
const stateManager = new StateManager();
// Add logging middleware
stateManager.use(async (key, newValue, oldValue, context) => {
console.log(`State change: ${key}`, {
from: oldValue,
to: newValue,
context
});
return newValue;
});
// Add validation middleware
stateManager.use(async (key, newValue, oldValue, context) => {
if (key === 'userCount' && typeof newValue !== 'number') {
throw new Error('userCount must be a number');
}
return newValue;
});
// Subscribe to changes
const unsubscribe = stateManager.subscribe('userCount', ({ oldValue, newValue }) => {
console.log(`User count changed from ${oldValue} to ${newValue}`);
});
// Set initial state
stateManager.set('userCount', 0);
stateManager.set('appName', 'My Node.js App');
// Update state
stateManager.update('userCount', count => count + 1);
module.exports = StateManager;用户会话存储
javascript
// user-session-store.js
class UserSessionStore {
constructor() {
this.sessions = new Map();
this.userSessions = new Map(); // userId -> Set of sessionIds
this.sessionTimeout = 30 * 60 * 1000; // 30 minutes
this.cleanupInterval = 5 * 60 * 1000; // 5 minutes
this.startCleanup();
}
createSession(userId, sessionData = {}) {
const sessionId = this.generateSessionId();
const session = {
id: sessionId,
userId,
data: sessionData,
createdAt: new Date(),
lastActivity: new Date(),
isActive: true
};
this.sessions.set(sessionId, session);
// Track user sessions
if (!this.userSessions.has(userId)) {
this.userSessions.set(userId, new Set());
}
this.userSessions.get(userId).add(sessionId);
return session;
}
getSession(sessionId) {
const session = this.sessions.get(sessionId);
if (session && this.isSessionValid(session)) {
session.lastActivity = new Date();
return session;
}
return null;
}
updateSession(sessionId, data) {
const session = this.sessions.get(sessionId);
if (session && this.isSessionValid(session)) {
session.data = { ...session.data, ...data };
session.lastActivity = new Date();
return session;
}
return null;
}
destroySession(sessionId) {
const session = this.sessions.get(sessionId);
if (session) {
// Remove from user sessions
const userSessions = this.userSessions.get(session.userId);
if (userSessions) {
userSessions.delete(sessionId);
if (userSessions.size === 0) {
this.userSessions.delete(session.userId);
}
}
return this.sessions.delete(sessionId);
}
return false;
}
getUserSessions(userId) {
const sessionIds = this.userSessions.get(userId);
if (!sessionIds) {
return [];
}
return Array.from(sessionIds)
.map(id => this.sessions.get(id))
.filter(session => session && this.isSessionValid(session));
}
destroyUserSessions(userId) {
const sessionIds = this.userSessions.get(userId);
if (sessionIds) {
for (const sessionId of sessionIds) {
this.sessions.delete(sessionId);
}
this.userSessions.delete(userId);
return sessionIds.size;
}
return 0;
}
isSessionValid(session) {
if (!session.isActive) {
return false;
}
const now = new Date();
const timeSinceActivity = now - session.lastActivity;
return timeSinceActivity < this.sessionTimeout;
}
generateSessionId() {
return require('crypto').randomBytes(32).toString('hex');
}
startCleanup() {
setInterval(() => {
this.cleanupExpiredSessions();
}, this.cleanupInterval);
}
cleanupExpiredSessions() {
const now = new Date();
let cleanedCount = 0;
for (const [sessionId, session] of this.sessions) {
if (!this.isSessionValid(session)) {
this.destroySession(sessionId);
cleanedCount++;
}
}
if (cleanedCount > 0) {
console.log(`Cleaned up ${cleanedCount} expired sessions`);
}
}
getStats() {
return {
totalSessions: this.sessions.size,
activeUsers: this.userSessions.size,
averageSessionsPerUser: this.sessions.size / Math.max(this.userSessions.size, 1)
};
}
}
module.exports = UserSessionStore;缓存策略
多级缓存
javascript
// cache-manager.js
const NodeCache = require('node-cache');
const redis = require('redis');
class CacheManager {
constructor(options = {}) {
// L1 Cache - In-memory (fastest)
this.l1Cache = new NodeCache({
stdTTL: options.l1TTL || 300, // 5 minutes
checkperiod: options.l1CheckPeriod || 60 // 1 minute
});
// L2 Cache - Redis (shared across instances)
this.l2Cache = redis.createClient(options.redis || {});
this.l2TTL = options.l2TTL || 3600; // 1 hour
this.stats = {
l1Hits: 0,
l2Hits: 0,
misses: 0,
sets: 0
};
this.setupEventHandlers();
}
setupEventHandlers() {
this.l1Cache.on('set', (key, value) => {
console.log(`L1 Cache SET: ${key}`);
});
this.l1Cache.on('del', (key, value) => {
console.log(`L1 Cache DEL: ${key}`);
});
this.l1Cache.on('expired', (key, value) => {
console.log(`L1 Cache EXPIRED: ${key}`);
});
}
async get(key) {
try {
// Try L1 cache first
const l1Value = this.l1Cache.get(key);
if (l1Value !== undefined) {
this.stats.l1Hits++;
return l1Value;
}
// Try L2 cache
const l2Value = await this.l2Cache.get(key);
if (l2Value !== null) {
this.stats.l2Hits++;
// Promote to L1 cache
const parsedValue = JSON.parse(l2Value);
this.l1Cache.set(key, parsedValue);
return parsedValue;
}
// Cache miss
this.stats.misses++;
return null;
} catch (error) {
console.error('Cache get error:', error);
return null;
}
}
async set(key, value, ttl = null) {
try {
this.stats.sets++;
// Set in L1 cache
this.l1Cache.set(key, value, ttl || this.l1Cache.options.stdTTL);
// Set in L2 cache
const serializedValue = JSON.stringify(value);
await this.l2Cache.setex(key, ttl || this.l2TTL, serializedValue);
return true;
} catch (error) {
console.error('Cache set error:', error);
return false;
}
}
async delete(key) {
try {
// Delete from both caches
this.l1Cache.del(key);
await this.l2Cache.del(key);
return true;
} catch (error) {
console.error('Cache delete error:', error);
return false;
}
}
async clear() {
try {
this.l1Cache.flushAll();
await this.l2Cache.flushall();
return true;
} catch (error) {
console.error('Cache clear error:', error);
return false;
}
}
// Cache-aside pattern
async getOrSet(key, fetchFunction, ttl = null) {
const cachedValue = await this.get(key);
if (cachedValue !== null) {
return cachedValue;
}
try {
const freshValue = await fetchFunction();
await this.set(key, freshValue, ttl);
return freshValue;
} catch (error) {
console.error('Cache getOrSet error:', error);
throw error;
}
}
// Write-through pattern
async setAndPersist(key, value, persistFunction, ttl = null) {
try {
// Persist to database first
await persistFunction(value);
// Then cache
await this.set(key, value, ttl);
return true;
} catch (error) {
console.error('Cache setAndPersist error:', error);
throw error;
}
}
// Write-behind pattern
async setWithDelayedPersist(key, value, persistFunction, ttl = null, delay = 5000) {
try {
// Cache immediately
await this.set(key, value, ttl);
// Persist after delay
setTimeout(async () => {
try {
await persistFunction(value);
} catch (error) {
console.error('Delayed persist error:', error);
}
}, delay);
return true;
} catch (error) {
console.error('Cache setWithDelayedPersist error:', error);
throw error;
}
}
getStats() {
const total = this.stats.l1Hits + this.stats.l2Hits + this.stats.misses;
return {
...this.stats,
total,
l1HitRate: total > 0 ? (this.stats.l1Hits / total * 100).toFixed(2) + '%' : '0%',
l2HitRate: total > 0 ? (this.stats.l2Hits / total * 100).toFixed(2) + '%' : '0%',
missRate: total > 0 ? (this.stats.misses / total * 100).toFixed(2) + '%' : '0%',
l1Size: this.l1Cache.getStats().keys,
l1Memory: this.l1Cache.getStats().vsize
};
}
resetStats() {
this.stats = {
l1Hits: 0,
l2Hits: 0,
misses: 0,
sets: 0
};
}
}
module.exports = CacheManager;缓存装饰器
javascript
// cache-decorators.js
const CacheManager = require('./cache-manager');
class CacheDecorators {
constructor(cacheManager) {
this.cache = cacheManager;
}
// Method decorator for caching
cached(ttl = 3600, keyGenerator = null) {
return (target, propertyName, descriptor) => {
const originalMethod = descriptor.value;
descriptor.value = async function(...args) {
const cacheKey = keyGenerator
? keyGenerator(propertyName, args)
: `${target.constructor.name}:${propertyName}:${JSON.stringify(args)}`;
// Try to get from cache
const cachedResult = await this.cache.get(cacheKey);
if (cachedResult !== null) {
return cachedResult;
}
// Execute original method
const result = await originalMethod.apply(this, args);
// Cache the result
await this.cache.set(cacheKey, result, ttl);
return result;
};
return descriptor;
};
}
// Cache invalidation decorator
invalidatesCache(patterns = []) {
return (target, propertyName, descriptor) => {
const originalMethod = descriptor.value;
descriptor.value = async function(...args) {
const result = await originalMethod.apply(this, args);
// Invalidate cache patterns
for (const pattern of patterns) {
const keys = typeof pattern === 'function'
? pattern(args, result)
: [pattern];
for (const key of keys) {
await this.cache.delete(key);
}
}
return result;
};
return descriptor;
};
}
}
// Usage example
class UserService {
constructor(cacheManager) {
this.cache = cacheManager;
this.decorators = new CacheDecorators(cacheManager);
}
// @cached(3600) // Cache for 1 hour
async getUserById(userId) {
// Simulate database call
console.log(`Fetching user ${userId} from database`);
return {
id: userId,
name: `User ${userId}`,
email: `user${userId}@example.com`
};
}
// @invalidatesCache(['user:*'])
async updateUser(userId, userData) {
// Simulate database update
console.log(`Updating user ${userId} in database`);
// Invalidate user cache
await this.cache.delete(`UserService:getUserById:["${userId}"]`);
return { id: userId, ...userData };
}
}
module.exports = { CacheDecorators, UserService };状态同步
事件驱动的状态同步
javascript
// state-sync.js
const EventEmitter = require('events');
class StateSynchronizer extends EventEmitter {
constructor() {
super();
this.nodes = new Map();
this.state = new Map();
this.version = 0;
}
registerNode(nodeId, connection) {
this.nodes.set(nodeId, {
id: nodeId,
connection,
lastSync: new Date(),
version: 0
});
// Send current state to new node
this.syncNode(nodeId);
this.emit('nodeRegistered', nodeId);
}
unregisterNode(nodeId) {
this.nodes.delete(nodeId);
this.emit('nodeUnregistered', nodeId);
}
setState(key, value, sourceNode = null) {
const oldValue = this.state.get(key);
this.state.set(key, value);
this.version++;
const change = {
key,
value,
oldValue,
version: this.version,
timestamp: new Date(),
sourceNode
};
// Broadcast to all nodes except source
this.broadcastChange(change, sourceNode);
this.emit('stateChanged', change);
}
getState(key) {
return this.state.get(key);
}
getAllState() {
return Object.fromEntries(this.state);
}
broadcastChange(change, excludeNode = null) {
for (const [nodeId, node] of this.nodes) {
if (nodeId !== excludeNode) {
try {
node.connection.send(JSON.stringify({
type: 'stateChange',
data: change
}));
} catch (error) {
console.error(`Failed to send to node ${nodeId}:`, error);
this.unregisterNode(nodeId);
}
}
}
}
syncNode(nodeId) {
const node = this.nodes.get(nodeId);
if (!node) return;
const syncData = {
type: 'fullSync',
data: {
state: this.getAllState(),
version: this.version
}
};
try {
node.connection.send(JSON.stringify(syncData));
node.lastSync = new Date();
node.version = this.version;
} catch (error) {
console.error(`Failed to sync node ${nodeId}:`, error);
this.unregisterNode(nodeId);
}
}
handleMessage(nodeId, message) {
try {
const { type, data } = JSON.parse(message);
switch (type) {
case 'stateChange':
this.handleStateChange(nodeId, data);
break;
case 'syncRequest':
this.syncNode(nodeId);
break;
case 'heartbeat':
this.handleHeartbeat(nodeId);
break;
}
} catch (error) {
console.error(`Invalid message from node ${nodeId}:`, error);
}
}
handleStateChange(nodeId, change) {
if (change.version > this.version) {
this.state.set(change.key, change.value);
this.version = change.version;
// Broadcast to other nodes
this.broadcastChange(change, nodeId);
this.emit('stateChanged', { ...change, sourceNode: nodeId });
}
}
handleHeartbeat(nodeId) {
const node = this.nodes.get(nodeId);
if (node) {
node.lastSync = new Date();
}
}
startHeartbeat(interval = 30000) {
setInterval(() => {
const now = new Date();
for (const [nodeId, node] of this.nodes) {
const timeSinceSync = now - node.lastSync;
if (timeSinceSync > interval * 2) {
console.log(`Node ${nodeId} appears to be disconnected`);
this.unregisterNode(nodeId);
}
}
}, interval);
}
getNodeStats() {
return {
totalNodes: this.nodes.size,
stateSize: this.state.size,
version: this.version,
nodes: Array.from(this.nodes.entries()).map(([id, node]) => ({
id,
lastSync: node.lastSync,
version: node.version
}))
};
}
}
module.exports = StateSynchronizer;下一步
在下一章中,我们将探索 Node.js 中的函数和方法,包括高级函数模式、闭包和函数式编程概念。
实践练习
- 使用 Redis 实现分布式会话存储
- 创建带有自动缓存预热的多级缓存系统
- 使用 WebSocket 构建实时状态同步系统
- 实现带有自动失效的缓存旁路模式
关键要点
- 会话管理在无状态 HTTP 中实现有状态交互
- 内存状态管理提供快速访问应用程序数据
- 多级缓存提高性能并减少数据库负载
- 状态同步启用分布式应用程序架构
- 正确的缓存失效策略防止过期数据问题
- 事件驱动的状态管理启用响应式应用程序
- 监控和统计有助于优化状态管理性能