Zig 异步编程
Zig 提供了强大的异步编程支持,包括 async/await 语法和协程机制。本章将介绍 Zig 中异步编程的基本概念和使用方法。
异步编程基础
async 和 await 关键字
zig
const std = @import("std");
// 异步函数
async fn asyncTask(id: u32, duration: u64) void {
std.debug.print("任务 {} 开始\n", .{id});
// 模拟异步操作
std.time.sleep(duration * 1000000); // 转换为纳秒
std.debug.print("任务 {} 完成\n", .{id});
}
// 返回值的异步函数
async fn asyncCalculation(a: i32, b: i32) i32 {
std.debug.print("开始计算 {} + {}\n", .{ a, b });
// 模拟计算延迟
std.time.sleep(100000000); // 100ms
const result = a + b;
std.debug.print("计算完成: {} + {} = {}\n", .{ a, b, result });
return result;
}
pub fn main() void {
std.debug.print("异步编程示例\n");
// 启动异步任务
var frame1 = async asyncTask(1, 200);
var frame2 = async asyncTask(2, 100);
var frame3 = async asyncCalculation(10, 20);
std.debug.print("所有任务已启动\n");
// 等待任务完成
await frame1;
await frame2;
const result = await frame3;
std.debug.print("计算结果: {}\n", .{result});
std.debug.print("所有任务完成\n");
}协程帧 (Frame)
zig
const std = @import("std");
// 异步函数返回协程帧
async fn countDown(from: u32) void {
var i = from;
while (i > 0) : (i -= 1) {
std.debug.print("倒计时: {}\n", .{i});
std.time.sleep(500000000); // 500ms
// 主动让出控制权
suspend {}
}
std.debug.print("倒计时结束!\n", .{});
}
async fn countUp(to: u32) void {
for (1..to + 1) |i| {
std.debug.print("正计时: {}\n", .{i});
std.time.sleep(300000000); // 300ms
suspend {}
}
std.debug.print("正计时结束!\n", .{});
}
pub fn main() void {
std.debug.print("协程示例\n");
// 创建协程帧
var countdown_frame = async countDown(5);
var countup_frame = async countUp(3);
// 手动调度协程
var countdown_done = false;
var countup_done = false;
while (!countdown_done or !countup_done) {
if (!countdown_done) {
if (await countdown_frame) {
countdown_done = true;
} else |_| {
// 协程被挂起,继续执行其他协程
}
}
if (!countup_done) {
if (await countup_frame) {
countup_done = true;
} else |_| {
// 协程被挂起
}
}
std.time.sleep(100000000); // 100ms
}
std.debug.print("所有协程完成\n");
}异步 I/O 操作
文件异步读写
zig
const std = @import("std");
// 异步文件读取
async fn readFileAsync(allocator: std.mem.Allocator, path: []const u8) ![]u8 {
std.debug.print("开始异步读取文件: {s}\n", .{path});
// 模拟异步 I/O 延迟
std.time.sleep(100000000); // 100ms
// 实际的文件读取操作
const file = std.fs.cwd().openFile(path, .{}) catch |err| {
std.debug.print("文件打开失败: {}\n", .{err});
return err;
};
defer file.close();
const file_size = try file.getEndPos();
const contents = try allocator.alloc(u8, file_size);
_ = try file.readAll(contents);
std.debug.print("文件读取完成: {s} ({} 字节)\n", .{ path, file_size });
return contents;
}
// 异步文件写入
async fn writeFileAsync(path: []const u8, content: []const u8) !void {
std.debug.print("开始异步写入文件: {s}\n", .{path});
// 模拟异步 I/O 延迟
std.time.sleep(150000000); // 150ms
const file = try std.fs.cwd().createFile(path, .{});
defer file.close();
try file.writeAll(content);
std.debug.print("文件写入完成: {s} ({} 字节)\n", .{ path, content.len });
}
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
std.debug.print("异步文件 I/O 示例\n");
// 创建测试文件
const test_content = "Hello, Async Zig!\n这是异步文件操作的测试内容。\n";
const test_file = "async_test.txt";
// 异步写入文件
var write_frame = async writeFileAsync(test_file, test_content);
// 等待写入完成
try await write_frame;
// 异步读取文件
var read_frame = async readFileAsync(allocator, test_file);
const file_content = try await read_frame;
defer allocator.free(file_content);
std.debug.print("读取的文件内容:\n{s}\n", .{file_content});
// 清理测试文件
std.fs.cwd().deleteFile(test_file) catch {};
}异步网络编程
简单的异步服务器
zig
const std = @import("std");
// 异步处理客户端连接
async fn handleClient(allocator: std.mem.Allocator, client_id: u32) !void {
std.debug.print("客户端 {} 连接\n", .{client_id});
// 模拟处理请求的延迟
std.time.sleep(200000000 + client_id * 50000000); // 200ms + 额外延迟
// 模拟处理逻辑
const response = try std.fmt.allocPrint(allocator, "Hello from server! Client ID: {}", .{client_id});
defer allocator.free(response);
std.debug.print("向客户端 {} 发送响应: {s}\n", .{ client_id, response });
// 模拟发送响应的延迟
std.time.sleep(100000000); // 100ms
std.debug.print("客户端 {} 断开连接\n", .{client_id});
}
// 异步服务器
async fn runServer(allocator: std.mem.Allocator, max_clients: u32) !void {
std.debug.print("服务器启动,最大客户端数: {}\n", .{max_clients});
var client_frames = try allocator.alloc(@Frame(handleClient), max_clients);
defer allocator.free(client_frames);
// 模拟客户端连接
for (0..max_clients) |i| {
const client_id: u32 = @intCast(i + 1);
client_frames[i] = async handleClient(allocator, client_id);
// 模拟客户端连接间隔
std.time.sleep(50000000); // 50ms
}
// 等待所有客户端处理完成
for (client_frames) |*frame| {
try await frame;
}
std.debug.print("服务器关闭\n");
}
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
std.debug.print("异步网络服务器示例\n");
var server_frame = async runServer(allocator, 5);
try await server_frame;
std.debug.print("程序结束\n");
}异步任务调度
简单的任务调度器
zig
const std = @import("std");
const Task = struct {
id: u32,
priority: u32,
duration: u64, // 毫秒
const Self = @This();
pub fn compare(context: void, a: Self, b: Self) std.math.Order {
_ = context;
return std.math.order(a.priority, b.priority);
}
};
// 异步任务执行器
async fn executeTask(task: Task) void {
std.debug.print("开始执行任务 {} (优先级: {}, 预计耗时: {}ms)\n",
.{ task.id, task.priority, task.duration });
// 模拟任务执行
std.time.sleep(task.duration * 1000000); // 转换为纳秒
std.debug.print("任务 {} 执行完成\n", .{task.id});
}
// 异步任务调度器
async fn taskScheduler(allocator: std.mem.Allocator, tasks: []const Task) !void {
std.debug.print("任务调度器启动,共 {} 个任务\n", .{tasks.len});
// 按优先级排序任务
var sorted_tasks = try allocator.dupe(Task, tasks);
defer allocator.free(sorted_tasks);
std.mem.sort(Task, sorted_tasks, {}, Task.compare);
// 创建任务帧数组
var task_frames = try allocator.alloc(@Frame(executeTask), sorted_tasks.len);
defer allocator.free(task_frames);
// 启动所有任务
for (sorted_tasks, 0..) |task, i| {
task_frames[i] = async executeTask(task);
// 高优先级任务之间的间隔更短
const delay = if (task.priority <= 2) 10000000 else 50000000; // 10ms 或 50ms
std.time.sleep(delay);
}
// 等待所有任务完成
for (task_frames) |*frame| {
await frame;
}
std.debug.print("所有任务执行完成\n");
}
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
std.debug.print("异步任务调度示例\n");
const tasks = [_]Task{
Task{ .id = 1, .priority = 3, .duration = 200 },
Task{ .id = 2, .priority = 1, .duration = 100 },
Task{ .id = 3, .priority = 2, .duration = 150 },
Task{ .id = 4, .priority = 1, .duration = 80 },
Task{ .id = 5, .priority = 3, .duration = 300 },
};
var scheduler_frame = async taskScheduler(allocator, &tasks);
try await scheduler_frame;
std.debug.print("调度器关闭\n");
}异步错误处理
异步函数的错误处理
zig
const std = @import("std");
const AsyncError = error{
NetworkTimeout,
InvalidResponse,
ServiceUnavailable,
};
// 可能失败的异步操作
async fn unreliableOperation(id: u32, success_rate: f32) AsyncError![]const u8 {
std.debug.print("开始不可靠操作 {}\n", .{id});
// 模拟网络延迟
std.time.sleep(100000000 + id * 20000000); // 100ms + 额外延迟
// 模拟随机失败
var rng = std.rand.DefaultPrng.init(@intCast(std.time.timestamp() + id));
const random_value = rng.random().float(f32);
if (random_value > success_rate) {
const error_type = switch (@as(u32, @intFromFloat(random_value * 3))) {
0 => AsyncError.NetworkTimeout,
1 => AsyncError.InvalidResponse,
else => AsyncError.ServiceUnavailable,
};
std.debug.print("操作 {} 失败: {}\n", .{ id, error_type });
return error_type;
}
const result = switch (id % 3) {
0 => "数据A",
1 => "数据B",
else => "数据C",
};
std.debug.print("操作 {} 成功: {s}\n", .{ id, result });
return result;
}
// 带重试的异步操作
async fn operationWithRetry(id: u32, max_retries: u32) ![]const u8 {
var attempts: u32 = 0;
while (attempts < max_retries) {
attempts += 1;
std.debug.print("操作 {} 尝试 {}/{}\n", .{ id, attempts, max_retries });
if (unreliableOperation(id, 0.6)) |result| {
return result;
} else |err| {
if (attempts >= max_retries) {
std.debug.print("操作 {} 最终失败: {}\n", .{ id, err });
return err;
}
// 等待后重试
std.time.sleep(50000000); // 50ms
}
}
return AsyncError.ServiceUnavailable;
}
pub fn main() !void {
std.debug.print("异步错误处理示例\n");
const operations = [_]u32{ 1, 2, 3, 4, 5 };
var frames: [operations.len]@Frame(operationWithRetry) = undefined;
// 启动所有操作
for (operations, 0..) |id, i| {
frames[i] = async operationWithRetry(id, 3);
}
// 收集结果
var successful_operations: u32 = 0;
var failed_operations: u32 = 0;
for (frames, 0..) |*frame, i| {
if (await frame) |result| {
std.debug.print("✅ 操作 {} 最终成功: {s}\n", .{ operations[i], result });
successful_operations += 1;
} else |err| {
std.debug.print("❌ 操作 {} 最终失败: {}\n", .{ operations[i], err });
failed_operations += 1;
}
}
std.debug.print("\n统计结果:\n");
std.debug.print("成功: {} 个操作\n", .{successful_operations});
std.debug.print("失败: {} 个操作\n", .{failed_operations});
}异步编程最佳实践
1. 合理使用 suspend 和 resume
zig
const std = @import("std");
// 协作式多任务
async fn cooperativeTask(id: u32, work_units: u32) void {
std.debug.print("协作任务 {} 开始 ({} 个工作单元)\n", .{ id, work_units });
for (0..work_units) |i| {
// 执行一个工作单元
std.debug.print("任务 {} 执行工作单元 {}\n", .{ id, i + 1 });
// 模拟工作
std.time.sleep(50000000); // 50ms
// 主动让出控制权,允许其他任务执行
if ((i + 1) % 2 == 0) {
suspend {}
}
}
std.debug.print("协作任务 {} 完成\n", .{id});
}
pub fn main() void {
std.debug.print("协作式多任务示例\n");
var task1 = async cooperativeTask(1, 6);
var task2 = async cooperativeTask(2, 4);
var task3 = async cooperativeTask(3, 5);
// 等待所有任务完成
await task1;
await task2;
await task3;
std.debug.print("所有协作任务完成\n");
}2. 异步资源管理
zig
const std = @import("std");
const AsyncResource = struct {
id: u32,
is_acquired: bool,
const Self = @This();
pub fn init(id: u32) Self {
return Self{
.id = id,
.is_acquired = false,
};
}
pub async fn acquire(self: *Self) !void {
std.debug.print("尝试获取资源 {}\n", .{self.id});
// 模拟资源获取延迟
std.time.sleep(100000000); // 100ms
if (self.is_acquired) {
return error.ResourceBusy;
}
self.is_acquired = true;
std.debug.print("资源 {} 获取成功\n", .{self.id});
}
pub fn release(self: *Self) void {
if (self.is_acquired) {
self.is_acquired = false;
std.debug.print("资源 {} 已释放\n", .{self.id});
}
}
};
async fn useResource(resource: *AsyncResource, user_id: u32) !void {
std.debug.print("用户 {} 请求使用资源 {}\n", .{ user_id, resource.id });
// 获取资源
try await resource.acquire();
defer resource.release(); // 确保资源被释放
// 使用资源
std.debug.print("用户 {} 正在使用资源 {}\n", .{ user_id, resource.id });
std.time.sleep(200000000); // 200ms
std.debug.print("用户 {} 使用资源 {} 完成\n", .{ user_id, resource.id });
}
pub fn main() !void {
std.debug.print("异步资源管理示例\n");
var resource = AsyncResource.init(1);
var user1_frame = async useResource(&resource, 1);
var user2_frame = async useResource(&resource, 2);
// 等待所有用户完成
try await user1_frame;
try await user2_frame;
std.debug.print("资源管理示例完成\n");
}总结
本章介绍了 Zig 异步编程的基础知识:
- ✅ async/await 语法和协程概念
- ✅ 异步 I/O 操作
- ✅ 异步网络编程基础
- ✅ 任务调度和管理
- ✅ 异步错误处理
- ✅ 最佳实践和资源管理
Zig 的异步编程模型提供了强大而灵活的并发处理能力,适合构建高性能的异步应用程序。需要注意的是,Zig 的异步功能仍在发展中,某些 API 可能会在未来版本中发生变化。
在下一章中,我们将学习 Zig 与 C 语言的交互。