Bun WebSocket
Bun 内置了高性能的 WebSocket 服务器,支持实时双向通信。本章介绍 Bun WebSocket 的使用方法。
WebSocket 服务器
基本服务器
typescript
const server = Bun.serve({
port: 3000,
fetch(request, server) {
// 升级为 WebSocket 连接
if (server.upgrade(request)) {
return; // 升级成功
}
return new Response("请使用 WebSocket 连接");
},
websocket: {
// 连接打开
open(ws) {
console.log("客户端已连接");
ws.send("欢迎连接!");
},
// 收到消息
message(ws, message) {
console.log("收到消息:", message);
ws.send(`你说: ${message}`);
},
// 连接关闭
close(ws, code, reason) {
console.log("连接关闭:", code, reason);
},
// 发生错误
error(ws, error) {
console.error("WebSocket 错误:", error);
},
},
});
console.log(`WebSocket 服务器运行在 ws://localhost:${server.port}`);客户端连接
javascript
// 浏览器端 JavaScript
const ws = new WebSocket("ws://localhost:3000");
ws.onopen = () => {
console.log("已连接");
ws.send("Hello, Server!");
};
ws.onmessage = (event) => {
console.log("收到消息:", event.data);
};
ws.onclose = () => {
console.log("连接已关闭");
};
ws.onerror = (error) => {
console.error("WebSocket 错误:", error);
};WebSocket 配置
完整配置
typescript
Bun.serve({
port: 3000,
fetch(request, server) {
// 可以在升级时传递数据
const userId = new URL(request.url).searchParams.get("userId");
const success = server.upgrade(request, {
// 传递给 websocket 处理程序的数据
data: {
userId,
connectedAt: Date.now(),
},
});
if (success) return;
return new Response("升级失败", { status: 400 });
},
websocket: {
// 最大消息大小(字节)
maxPayloadLength: 16 * 1024 * 1024, // 16 MB
// 空闲超时(秒)
idleTimeout: 120,
// 背压限制
backpressureLimit: 1024 * 1024, // 1 MB
// 是否自动关闭空闲连接
closeOnBackpressureLimit: false,
// 启用压缩
perMessageDeflate: true,
open(ws) {
// 访问传递的数据
console.log("用户连接:", ws.data.userId);
},
message(ws, message) {
console.log(`用户 ${ws.data.userId} 说:`, message);
},
close(ws) {
console.log("用户断开:", ws.data.userId);
},
},
});消息处理
发送消息
typescript
websocket: {
message(ws, message) {
// 发送文本
ws.send("文本消息");
// 发送 JSON
ws.send(JSON.stringify({ type: "message", data: "Hello" }));
// 发送二进制数据
ws.send(new Uint8Array([1, 2, 3, 4]));
// 检查发送结果
const bytesSent = ws.send("消息");
console.log("发送字节数:", bytesSent);
},
}接收消息
typescript
websocket: {
message(ws, message) {
// message 可以是 string 或 Buffer
if (typeof message === "string") {
console.log("文本消息:", message);
// 尝试解析 JSON
try {
const data = JSON.parse(message);
handleJsonMessage(ws, data);
} catch {
handleTextMessage(ws, message);
}
} else {
console.log("二进制消息:", message.length, "字节");
handleBinaryMessage(ws, message);
}
},
}
function handleJsonMessage(ws: ServerWebSocket, data: any) {
switch (data.type) {
case "ping":
ws.send(JSON.stringify({ type: "pong" }));
break;
case "message":
console.log("收到消息:", data.content);
break;
}
}广播和频道
Pub/Sub 模式
typescript
const server = Bun.serve({
port: 3000,
fetch(request, server) {
const url = new URL(request.url);
const room = url.searchParams.get("room") || "default";
server.upgrade(request, {
data: { room },
});
},
websocket: {
open(ws) {
// 订阅频道
ws.subscribe(ws.data.room);
console.log(`用户加入房间: ${ws.data.room}`);
// 向房间广播
server.publish(ws.data.room, `新用户加入了 ${ws.data.room}`);
},
message(ws, message) {
// 向房间所有成员发送消息
server.publish(ws.data.room, message);
},
close(ws) {
// 取消订阅(自动处理)
server.publish(ws.data.room, "用户离开了房间");
},
},
});订阅管理
typescript
websocket: {
open(ws) {
// 订阅多个频道
ws.subscribe("global");
ws.subscribe(`user:${ws.data.userId}`);
ws.subscribe("notifications");
},
message(ws, message) {
const data = JSON.parse(message as string);
switch (data.action) {
case "subscribe":
ws.subscribe(data.channel);
break;
case "unsubscribe":
ws.unsubscribe(data.channel);
break;
case "publish":
server.publish(data.channel, data.message);
break;
}
},
close(ws) {
// 所有订阅会自动清理
},
}连接管理
跟踪连接
typescript
// 使用 Set 跟踪所有连接
const connections = new Set<ServerWebSocket>();
Bun.serve({
port: 3000,
fetch(request, server) {
server.upgrade(request);
},
websocket: {
open(ws) {
connections.add(ws);
console.log(`连接数: ${connections.size}`);
},
close(ws) {
connections.delete(ws);
console.log(`连接数: ${connections.size}`);
},
message(ws, message) {
// 广播给所有连接
for (const client of connections) {
if (client !== ws) {
client.send(message);
}
}
},
},
});
// 定期发送心跳
setInterval(() => {
for (const ws of connections) {
ws.ping();
}
}, 30000);连接认证
typescript
Bun.serve({
port: 3000,
async fetch(request, server) {
// 验证 token
const url = new URL(request.url);
const token = url.searchParams.get("token");
if (!token) {
return new Response("缺少认证令牌", { status: 401 });
}
const user = await verifyToken(token);
if (!user) {
return new Response("无效的认证令牌", { status: 401 });
}
// 验证通过,升级连接
server.upgrade(request, {
data: { user },
});
},
websocket: {
open(ws) {
console.log(`用户 ${ws.data.user.name} 已连接`);
},
message(ws, message) {
// 可以访问用户信息
console.log(`${ws.data.user.name}: ${message}`);
},
},
});
async function verifyToken(token: string) {
// 实现 token 验证逻辑
if (token === "valid-token") {
return { id: 1, name: "张三" };
}
return null;
}心跳和保活
服务端心跳
typescript
websocket: {
open(ws) {
// 发送 ping
ws.ping();
},
pong(ws) {
console.log("收到 pong");
},
message(ws, message) {
// 收到消息时也可以发送 ping
ws.ping();
},
}客户端心跳
javascript
// 客户端心跳实现
class WebSocketClient {
constructor(url) {
this.url = url;
this.heartbeatInterval = 30000;
this.connect();
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log("已连接");
this.startHeartbeat();
};
this.ws.onclose = () => {
console.log("连接关闭,尝试重连...");
this.stopHeartbeat();
setTimeout(() => this.connect(), 3000);
};
this.ws.onmessage = (event) => {
if (event.data === "pong") {
console.log("心跳正常");
return;
}
// 处理其他消息
};
}
startHeartbeat() {
this.heartbeat = setInterval(() => {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send("ping");
}
}, this.heartbeatInterval);
}
stopHeartbeat() {
if (this.heartbeat) {
clearInterval(this.heartbeat);
}
}
}聊天室示例
typescript
// chat-server.ts
interface User {
id: string;
name: string;
}
interface Message {
type: "message" | "join" | "leave" | "users";
user?: string;
content?: string;
users?: string[];
timestamp: number;
}
const users = new Map<ServerWebSocket, User>();
const server = Bun.serve({
port: 3000,
fetch(request, server) {
const url = new URL(request.url);
const name = url.searchParams.get("name");
if (!name) {
return new Response("请提供用户名: ?name=xxx");
}
server.upgrade(request, {
data: {
id: crypto.randomUUID(),
name,
},
});
},
websocket: {
open(ws) {
const user: User = ws.data;
users.set(ws, user);
// 订阅聊天室
ws.subscribe("chat");
// 广播用户加入
broadcast({
type: "join",
user: user.name,
timestamp: Date.now(),
});
// 发送当前用户列表
ws.send(JSON.stringify({
type: "users",
users: Array.from(users.values()).map(u => u.name),
timestamp: Date.now(),
}));
},
message(ws, message) {
const user = users.get(ws);
if (!user) return;
const data = JSON.parse(message as string);
if (data.type === "message") {
broadcast({
type: "message",
user: user.name,
content: data.content,
timestamp: Date.now(),
});
}
},
close(ws) {
const user = users.get(ws);
if (user) {
users.delete(ws);
broadcast({
type: "leave",
user: user.name,
timestamp: Date.now(),
});
}
},
},
});
function broadcast(message: Message) {
server.publish("chat", JSON.stringify(message));
}
console.log(`聊天服务器运行在 ws://localhost:${server.port}`);聊天客户端
html
<!DOCTYPE html>
<html>
<head>
<title>Bun 聊天室</title>
<style>
#messages { height: 300px; overflow-y: auto; border: 1px solid #ccc; padding: 10px; }
.message { margin: 5px 0; }
.join { color: green; }
.leave { color: red; }
</style>
</head>
<body>
<div id="messages"></div>
<input type="text" id="input" placeholder="输入消息...">
<button onclick="sendMessage()">发送</button>
<script>
const name = prompt("请输入你的名字:");
const ws = new WebSocket(`ws://localhost:3000?name=${encodeURIComponent(name)}`);
const messages = document.getElementById("messages");
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
const div = document.createElement("div");
div.className = "message";
switch (data.type) {
case "message":
div.textContent = `${data.user}: ${data.content}`;
break;
case "join":
div.className += " join";
div.textContent = `${data.user} 加入了聊天室`;
break;
case "leave":
div.className += " leave";
div.textContent = `${data.user} 离开了聊天室`;
break;
case "users":
div.textContent = `在线用户: ${data.users.join(", ")}`;
break;
}
messages.appendChild(div);
messages.scrollTop = messages.scrollHeight;
};
function sendMessage() {
const input = document.getElementById("input");
if (input.value) {
ws.send(JSON.stringify({ type: "message", content: input.value }));
input.value = "";
}
}
document.getElementById("input").addEventListener("keypress", (e) => {
if (e.key === "Enter") sendMessage();
});
</script>
</body>
</html>WebSocket 客户端
Bun 作为客户端
typescript
// Bun 也可以作为 WebSocket 客户端
const ws = new WebSocket("ws://localhost:3000");
ws.addEventListener("open", () => {
console.log("已连接到服务器");
ws.send("Hello, Server!");
});
ws.addEventListener("message", (event) => {
console.log("收到:", event.data);
});
ws.addEventListener("close", () => {
console.log("连接已关闭");
});
// 保持进程运行
await Bun.sleep(Infinity);小结
本章介绍了:
- ✅ 创建 WebSocket 服务器
- ✅ 消息发送和接收
- ✅ Pub/Sub 广播模式
- ✅ 连接管理和认证
- ✅ 心跳和保活机制
- ✅ 完整聊天室示例
下一步
继续阅读 Fetch API 了解 Bun 的网络请求功能。