Skip to content

Bun WebSocket

Bun 内置了高性能的 WebSocket 服务器,支持实时双向通信。本章介绍 Bun WebSocket 的使用方法。

WebSocket 服务器

基本服务器

typescript
const server = Bun.serve({
  port: 3000,
  
  fetch(request, server) {
    // 升级为 WebSocket 连接
    if (server.upgrade(request)) {
      return; // 升级成功
    }
    
    return new Response("请使用 WebSocket 连接");
  },
  
  websocket: {
    // 连接打开
    open(ws) {
      console.log("客户端已连接");
      ws.send("欢迎连接!");
    },
    
    // 收到消息
    message(ws, message) {
      console.log("收到消息:", message);
      ws.send(`你说: ${message}`);
    },
    
    // 连接关闭
    close(ws, code, reason) {
      console.log("连接关闭:", code, reason);
    },
    
    // 发生错误
    error(ws, error) {
      console.error("WebSocket 错误:", error);
    },
  },
});

console.log(`WebSocket 服务器运行在 ws://localhost:${server.port}`);

客户端连接

javascript
// 浏览器端 JavaScript
const ws = new WebSocket("ws://localhost:3000");

ws.onopen = () => {
  console.log("已连接");
  ws.send("Hello, Server!");
};

ws.onmessage = (event) => {
  console.log("收到消息:", event.data);
};

ws.onclose = () => {
  console.log("连接已关闭");
};

ws.onerror = (error) => {
  console.error("WebSocket 错误:", error);
};

WebSocket 配置

完整配置

typescript
Bun.serve({
  port: 3000,
  
  fetch(request, server) {
    // 可以在升级时传递数据
    const userId = new URL(request.url).searchParams.get("userId");
    
    const success = server.upgrade(request, {
      // 传递给 websocket 处理程序的数据
      data: {
        userId,
        connectedAt: Date.now(),
      },
    });
    
    if (success) return;
    return new Response("升级失败", { status: 400 });
  },
  
  websocket: {
    // 最大消息大小(字节)
    maxPayloadLength: 16 * 1024 * 1024, // 16 MB
    
    // 空闲超时(秒)
    idleTimeout: 120,
    
    // 背压限制
    backpressureLimit: 1024 * 1024, // 1 MB
    
    // 是否自动关闭空闲连接
    closeOnBackpressureLimit: false,
    
    // 启用压缩
    perMessageDeflate: true,
    
    open(ws) {
      // 访问传递的数据
      console.log("用户连接:", ws.data.userId);
    },
    
    message(ws, message) {
      console.log(`用户 ${ws.data.userId} 说:`, message);
    },
    
    close(ws) {
      console.log("用户断开:", ws.data.userId);
    },
  },
});

消息处理

发送消息

typescript
websocket: {
  message(ws, message) {
    // 发送文本
    ws.send("文本消息");
    
    // 发送 JSON
    ws.send(JSON.stringify({ type: "message", data: "Hello" }));
    
    // 发送二进制数据
    ws.send(new Uint8Array([1, 2, 3, 4]));
    
    // 检查发送结果
    const bytesSent = ws.send("消息");
    console.log("发送字节数:", bytesSent);
  },
}

接收消息

typescript
websocket: {
  message(ws, message) {
    // message 可以是 string 或 Buffer
    if (typeof message === "string") {
      console.log("文本消息:", message);
      
      // 尝试解析 JSON
      try {
        const data = JSON.parse(message);
        handleJsonMessage(ws, data);
      } catch {
        handleTextMessage(ws, message);
      }
    } else {
      console.log("二进制消息:", message.length, "字节");
      handleBinaryMessage(ws, message);
    }
  },
}

function handleJsonMessage(ws: ServerWebSocket, data: any) {
  switch (data.type) {
    case "ping":
      ws.send(JSON.stringify({ type: "pong" }));
      break;
    case "message":
      console.log("收到消息:", data.content);
      break;
  }
}

广播和频道

Pub/Sub 模式

typescript
const server = Bun.serve({
  port: 3000,
  
  fetch(request, server) {
    const url = new URL(request.url);
    const room = url.searchParams.get("room") || "default";
    
    server.upgrade(request, {
      data: { room },
    });
  },
  
  websocket: {
    open(ws) {
      // 订阅频道
      ws.subscribe(ws.data.room);
      console.log(`用户加入房间: ${ws.data.room}`);
      
      // 向房间广播
      server.publish(ws.data.room, `新用户加入了 ${ws.data.room}`);
    },
    
    message(ws, message) {
      // 向房间所有成员发送消息
      server.publish(ws.data.room, message);
    },
    
    close(ws) {
      // 取消订阅(自动处理)
      server.publish(ws.data.room, "用户离开了房间");
    },
  },
});

订阅管理

typescript
websocket: {
  open(ws) {
    // 订阅多个频道
    ws.subscribe("global");
    ws.subscribe(`user:${ws.data.userId}`);
    ws.subscribe("notifications");
  },
  
  message(ws, message) {
    const data = JSON.parse(message as string);
    
    switch (data.action) {
      case "subscribe":
        ws.subscribe(data.channel);
        break;
      case "unsubscribe":
        ws.unsubscribe(data.channel);
        break;
      case "publish":
        server.publish(data.channel, data.message);
        break;
    }
  },
  
  close(ws) {
    // 所有订阅会自动清理
  },
}

连接管理

跟踪连接

typescript
// 使用 Set 跟踪所有连接
const connections = new Set<ServerWebSocket>();

Bun.serve({
  port: 3000,
  
  fetch(request, server) {
    server.upgrade(request);
  },
  
  websocket: {
    open(ws) {
      connections.add(ws);
      console.log(`连接数: ${connections.size}`);
    },
    
    close(ws) {
      connections.delete(ws);
      console.log(`连接数: ${connections.size}`);
    },
    
    message(ws, message) {
      // 广播给所有连接
      for (const client of connections) {
        if (client !== ws) {
          client.send(message);
        }
      }
    },
  },
});

// 定期发送心跳
setInterval(() => {
  for (const ws of connections) {
    ws.ping();
  }
}, 30000);

连接认证

typescript
Bun.serve({
  port: 3000,
  
  async fetch(request, server) {
    // 验证 token
    const url = new URL(request.url);
    const token = url.searchParams.get("token");
    
    if (!token) {
      return new Response("缺少认证令牌", { status: 401 });
    }
    
    const user = await verifyToken(token);
    if (!user) {
      return new Response("无效的认证令牌", { status: 401 });
    }
    
    // 验证通过,升级连接
    server.upgrade(request, {
      data: { user },
    });
  },
  
  websocket: {
    open(ws) {
      console.log(`用户 ${ws.data.user.name} 已连接`);
    },
    
    message(ws, message) {
      // 可以访问用户信息
      console.log(`${ws.data.user.name}: ${message}`);
    },
  },
});

async function verifyToken(token: string) {
  // 实现 token 验证逻辑
  if (token === "valid-token") {
    return { id: 1, name: "张三" };
  }
  return null;
}

心跳和保活

服务端心跳

typescript
websocket: {
  open(ws) {
    // 发送 ping
    ws.ping();
  },
  
  pong(ws) {
    console.log("收到 pong");
  },
  
  message(ws, message) {
    // 收到消息时也可以发送 ping
    ws.ping();
  },
}

客户端心跳

javascript
// 客户端心跳实现
class WebSocketClient {
  constructor(url) {
    this.url = url;
    this.heartbeatInterval = 30000;
    this.connect();
  }
  
  connect() {
    this.ws = new WebSocket(this.url);
    
    this.ws.onopen = () => {
      console.log("已连接");
      this.startHeartbeat();
    };
    
    this.ws.onclose = () => {
      console.log("连接关闭,尝试重连...");
      this.stopHeartbeat();
      setTimeout(() => this.connect(), 3000);
    };
    
    this.ws.onmessage = (event) => {
      if (event.data === "pong") {
        console.log("心跳正常");
        return;
      }
      // 处理其他消息
    };
  }
  
  startHeartbeat() {
    this.heartbeat = setInterval(() => {
      if (this.ws.readyState === WebSocket.OPEN) {
        this.ws.send("ping");
      }
    }, this.heartbeatInterval);
  }
  
  stopHeartbeat() {
    if (this.heartbeat) {
      clearInterval(this.heartbeat);
    }
  }
}

聊天室示例

typescript
// chat-server.ts
interface User {
  id: string;
  name: string;
}

interface Message {
  type: "message" | "join" | "leave" | "users";
  user?: string;
  content?: string;
  users?: string[];
  timestamp: number;
}

const users = new Map<ServerWebSocket, User>();

const server = Bun.serve({
  port: 3000,
  
  fetch(request, server) {
    const url = new URL(request.url);
    const name = url.searchParams.get("name");
    
    if (!name) {
      return new Response("请提供用户名: ?name=xxx");
    }
    
    server.upgrade(request, {
      data: {
        id: crypto.randomUUID(),
        name,
      },
    });
  },
  
  websocket: {
    open(ws) {
      const user: User = ws.data;
      users.set(ws, user);
      
      // 订阅聊天室
      ws.subscribe("chat");
      
      // 广播用户加入
      broadcast({
        type: "join",
        user: user.name,
        timestamp: Date.now(),
      });
      
      // 发送当前用户列表
      ws.send(JSON.stringify({
        type: "users",
        users: Array.from(users.values()).map(u => u.name),
        timestamp: Date.now(),
      }));
    },
    
    message(ws, message) {
      const user = users.get(ws);
      if (!user) return;
      
      const data = JSON.parse(message as string);
      
      if (data.type === "message") {
        broadcast({
          type: "message",
          user: user.name,
          content: data.content,
          timestamp: Date.now(),
        });
      }
    },
    
    close(ws) {
      const user = users.get(ws);
      if (user) {
        users.delete(ws);
        
        broadcast({
          type: "leave",
          user: user.name,
          timestamp: Date.now(),
        });
      }
    },
  },
});

function broadcast(message: Message) {
  server.publish("chat", JSON.stringify(message));
}

console.log(`聊天服务器运行在 ws://localhost:${server.port}`);

聊天客户端

html
<!DOCTYPE html>
<html>
<head>
  <title>Bun 聊天室</title>
  <style>
    #messages { height: 300px; overflow-y: auto; border: 1px solid #ccc; padding: 10px; }
    .message { margin: 5px 0; }
    .join { color: green; }
    .leave { color: red; }
  </style>
</head>
<body>
  <div id="messages"></div>
  <input type="text" id="input" placeholder="输入消息...">
  <button onclick="sendMessage()">发送</button>
  
  <script>
    const name = prompt("请输入你的名字:");
    const ws = new WebSocket(`ws://localhost:3000?name=${encodeURIComponent(name)}`);
    const messages = document.getElementById("messages");
    
    ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      const div = document.createElement("div");
      div.className = "message";
      
      switch (data.type) {
        case "message":
          div.textContent = `${data.user}: ${data.content}`;
          break;
        case "join":
          div.className += " join";
          div.textContent = `${data.user} 加入了聊天室`;
          break;
        case "leave":
          div.className += " leave";
          div.textContent = `${data.user} 离开了聊天室`;
          break;
        case "users":
          div.textContent = `在线用户: ${data.users.join(", ")}`;
          break;
      }
      
      messages.appendChild(div);
      messages.scrollTop = messages.scrollHeight;
    };
    
    function sendMessage() {
      const input = document.getElementById("input");
      if (input.value) {
        ws.send(JSON.stringify({ type: "message", content: input.value }));
        input.value = "";
      }
    }
    
    document.getElementById("input").addEventListener("keypress", (e) => {
      if (e.key === "Enter") sendMessage();
    });
  </script>
</body>
</html>

WebSocket 客户端

Bun 作为客户端

typescript
// Bun 也可以作为 WebSocket 客户端
const ws = new WebSocket("ws://localhost:3000");

ws.addEventListener("open", () => {
  console.log("已连接到服务器");
  ws.send("Hello, Server!");
});

ws.addEventListener("message", (event) => {
  console.log("收到:", event.data);
});

ws.addEventListener("close", () => {
  console.log("连接已关闭");
});

// 保持进程运行
await Bun.sleep(Infinity);

小结

本章介绍了:

  • ✅ 创建 WebSocket 服务器
  • ✅ 消息发送和接收
  • ✅ Pub/Sub 广播模式
  • ✅ 连接管理和认证
  • ✅ 心跳和保活机制
  • ✅ 完整聊天室示例

下一步

继续阅读 Fetch API 了解 Bun 的网络请求功能。

本站内容仅供学习和研究使用。