Skip to content

Zig 异步编程

Zig 提供了强大的异步编程支持,包括 async/await 语法和协程机制。本章将介绍 Zig 中异步编程的基本概念和使用方法。

异步编程基础

async 和 await 关键字

zig
const std = @import("std");

// 异步函数
async fn asyncTask(id: u32, duration: u64) void {
    std.debug.print("任务 {} 开始\n", .{id});
    
    // 模拟异步操作
    std.time.sleep(duration * 1000000); // 转换为纳秒
    
    std.debug.print("任务 {} 完成\n", .{id});
}

// 返回值的异步函数
async fn asyncCalculation(a: i32, b: i32) i32 {
    std.debug.print("开始计算 {} + {}\n", .{ a, b });
    
    // 模拟计算延迟
    std.time.sleep(100000000); // 100ms
    
    const result = a + b;
    std.debug.print("计算完成: {} + {} = {}\n", .{ a, b, result });
    
    return result;
}

pub fn main() void {
    std.debug.print("异步编程示例\n");
    
    // 启动异步任务
    var frame1 = async asyncTask(1, 200);
    var frame2 = async asyncTask(2, 100);
    var frame3 = async asyncCalculation(10, 20);
    
    std.debug.print("所有任务已启动\n");
    
    // 等待任务完成
    await frame1;
    await frame2;
    const result = await frame3;
    
    std.debug.print("计算结果: {}\n", .{result});
    std.debug.print("所有任务完成\n");
}

协程帧 (Frame)

zig
const std = @import("std");

// 异步函数返回协程帧
async fn countDown(from: u32) void {
    var i = from;
    while (i > 0) : (i -= 1) {
        std.debug.print("倒计时: {}\n", .{i});
        std.time.sleep(500000000); // 500ms
        
        // 主动让出控制权
        suspend {}
    }
    std.debug.print("倒计时结束!\n", .{});
}

async fn countUp(to: u32) void {
    for (1..to + 1) |i| {
        std.debug.print("正计时: {}\n", .{i});
        std.time.sleep(300000000); // 300ms
        
        suspend {}
    }
    std.debug.print("正计时结束!\n", .{});
}

pub fn main() void {
    std.debug.print("协程示例\n");
    
    // 创建协程帧
    var countdown_frame = async countDown(5);
    var countup_frame = async countUp(3);
    
    // 手动调度协程
    var countdown_done = false;
    var countup_done = false;
    
    while (!countdown_done or !countup_done) {
        if (!countdown_done) {
            if (await countdown_frame) {
                countdown_done = true;
            } else |_| {
                // 协程被挂起,继续执行其他协程
            }
        }
        
        if (!countup_done) {
            if (await countup_frame) {
                countup_done = true;
            } else |_| {
                // 协程被挂起
            }
        }
        
        std.time.sleep(100000000); // 100ms
    }
    
    std.debug.print("所有协程完成\n");
}

异步 I/O 操作

文件异步读写

zig
const std = @import("std");

// 异步文件读取
async fn readFileAsync(allocator: std.mem.Allocator, path: []const u8) ![]u8 {
    std.debug.print("开始异步读取文件: {s}\n", .{path});
    
    // 模拟异步 I/O 延迟
    std.time.sleep(100000000); // 100ms
    
    // 实际的文件读取操作
    const file = std.fs.cwd().openFile(path, .{}) catch |err| {
        std.debug.print("文件打开失败: {}\n", .{err});
        return err;
    };
    defer file.close();
    
    const file_size = try file.getEndPos();
    const contents = try allocator.alloc(u8, file_size);
    _ = try file.readAll(contents);
    
    std.debug.print("文件读取完成: {s} ({} 字节)\n", .{ path, file_size });
    return contents;
}

// 异步文件写入
async fn writeFileAsync(path: []const u8, content: []const u8) !void {
    std.debug.print("开始异步写入文件: {s}\n", .{path});
    
    // 模拟异步 I/O 延迟
    std.time.sleep(150000000); // 150ms
    
    const file = try std.fs.cwd().createFile(path, .{});
    defer file.close();
    
    try file.writeAll(content);
    
    std.debug.print("文件写入完成: {s} ({} 字节)\n", .{ path, content.len });
}

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer _ = gpa.deinit();
    const allocator = gpa.allocator();
    
    std.debug.print("异步文件 I/O 示例\n");
    
    // 创建测试文件
    const test_content = "Hello, Async Zig!\n这是异步文件操作的测试内容。\n";
    const test_file = "async_test.txt";
    
    // 异步写入文件
    var write_frame = async writeFileAsync(test_file, test_content);
    
    // 等待写入完成
    try await write_frame;
    
    // 异步读取文件
    var read_frame = async readFileAsync(allocator, test_file);
    const file_content = try await read_frame;
    defer allocator.free(file_content);
    
    std.debug.print("读取的文件内容:\n{s}\n", .{file_content});
    
    // 清理测试文件
    std.fs.cwd().deleteFile(test_file) catch {};
}

异步网络编程

简单的异步服务器

zig
const std = @import("std");

// 异步处理客户端连接
async fn handleClient(allocator: std.mem.Allocator, client_id: u32) !void {
    std.debug.print("客户端 {} 连接\n", .{client_id});
    
    // 模拟处理请求的延迟
    std.time.sleep(200000000 + client_id * 50000000); // 200ms + 额外延迟
    
    // 模拟处理逻辑
    const response = try std.fmt.allocPrint(allocator, "Hello from server! Client ID: {}", .{client_id});
    defer allocator.free(response);
    
    std.debug.print("向客户端 {} 发送响应: {s}\n", .{ client_id, response });
    
    // 模拟发送响应的延迟
    std.time.sleep(100000000); // 100ms
    
    std.debug.print("客户端 {} 断开连接\n", .{client_id});
}

// 异步服务器
async fn runServer(allocator: std.mem.Allocator, max_clients: u32) !void {
    std.debug.print("服务器启动,最大客户端数: {}\n", .{max_clients});
    
    var client_frames = try allocator.alloc(@Frame(handleClient), max_clients);
    defer allocator.free(client_frames);
    
    // 模拟客户端连接
    for (0..max_clients) |i| {
        const client_id: u32 = @intCast(i + 1);
        client_frames[i] = async handleClient(allocator, client_id);
        
        // 模拟客户端连接间隔
        std.time.sleep(50000000); // 50ms
    }
    
    // 等待所有客户端处理完成
    for (client_frames) |*frame| {
        try await frame;
    }
    
    std.debug.print("服务器关闭\n");
}

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer _ = gpa.deinit();
    const allocator = gpa.allocator();
    
    std.debug.print("异步网络服务器示例\n");
    
    var server_frame = async runServer(allocator, 5);
    try await server_frame;
    
    std.debug.print("程序结束\n");
}

异步任务调度

简单的任务调度器

zig
const std = @import("std");

const Task = struct {
    id: u32,
    priority: u32,
    duration: u64, // 毫秒
    
    const Self = @This();
    
    pub fn compare(context: void, a: Self, b: Self) std.math.Order {
        _ = context;
        return std.math.order(a.priority, b.priority);
    }
};

// 异步任务执行器
async fn executeTask(task: Task) void {
    std.debug.print("开始执行任务 {} (优先级: {}, 预计耗时: {}ms)\n", 
                    .{ task.id, task.priority, task.duration });
    
    // 模拟任务执行
    std.time.sleep(task.duration * 1000000); // 转换为纳秒
    
    std.debug.print("任务 {} 执行完成\n", .{task.id});
}

// 异步任务调度器
async fn taskScheduler(allocator: std.mem.Allocator, tasks: []const Task) !void {
    std.debug.print("任务调度器启动,共 {} 个任务\n", .{tasks.len});
    
    // 按优先级排序任务
    var sorted_tasks = try allocator.dupe(Task, tasks);
    defer allocator.free(sorted_tasks);
    
    std.mem.sort(Task, sorted_tasks, {}, Task.compare);
    
    // 创建任务帧数组
    var task_frames = try allocator.alloc(@Frame(executeTask), sorted_tasks.len);
    defer allocator.free(task_frames);
    
    // 启动所有任务
    for (sorted_tasks, 0..) |task, i| {
        task_frames[i] = async executeTask(task);
        
        // 高优先级任务之间的间隔更短
        const delay = if (task.priority <= 2) 10000000 else 50000000; // 10ms 或 50ms
        std.time.sleep(delay);
    }
    
    // 等待所有任务完成
    for (task_frames) |*frame| {
        await frame;
    }
    
    std.debug.print("所有任务执行完成\n");
}

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer _ = gpa.deinit();
    const allocator = gpa.allocator();
    
    std.debug.print("异步任务调度示例\n");
    
    const tasks = [_]Task{
        Task{ .id = 1, .priority = 3, .duration = 200 },
        Task{ .id = 2, .priority = 1, .duration = 100 },
        Task{ .id = 3, .priority = 2, .duration = 150 },
        Task{ .id = 4, .priority = 1, .duration = 80 },
        Task{ .id = 5, .priority = 3, .duration = 300 },
    };
    
    var scheduler_frame = async taskScheduler(allocator, &tasks);
    try await scheduler_frame;
    
    std.debug.print("调度器关闭\n");
}

异步错误处理

异步函数的错误处理

zig
const std = @import("std");

const AsyncError = error{
    NetworkTimeout,
    InvalidResponse,
    ServiceUnavailable,
};

// 可能失败的异步操作
async fn unreliableOperation(id: u32, success_rate: f32) AsyncError![]const u8 {
    std.debug.print("开始不可靠操作 {}\n", .{id});
    
    // 模拟网络延迟
    std.time.sleep(100000000 + id * 20000000); // 100ms + 额外延迟
    
    // 模拟随机失败
    var rng = std.rand.DefaultPrng.init(@intCast(std.time.timestamp() + id));
    const random_value = rng.random().float(f32);
    
    if (random_value > success_rate) {
        const error_type = switch (@as(u32, @intFromFloat(random_value * 3))) {
            0 => AsyncError.NetworkTimeout,
            1 => AsyncError.InvalidResponse,
            else => AsyncError.ServiceUnavailable,
        };
        
        std.debug.print("操作 {} 失败: {}\n", .{ id, error_type });
        return error_type;
    }
    
    const result = switch (id % 3) {
        0 => "数据A",
        1 => "数据B",
        else => "数据C",
    };
    
    std.debug.print("操作 {} 成功: {s}\n", .{ id, result });
    return result;
}

// 带重试的异步操作
async fn operationWithRetry(id: u32, max_retries: u32) ![]const u8 {
    var attempts: u32 = 0;
    
    while (attempts < max_retries) {
        attempts += 1;
        std.debug.print("操作 {} 尝试 {}/{}\n", .{ id, attempts, max_retries });
        
        if (unreliableOperation(id, 0.6)) |result| {
            return result;
        } else |err| {
            if (attempts >= max_retries) {
                std.debug.print("操作 {} 最终失败: {}\n", .{ id, err });
                return err;
            }
            
            // 等待后重试
            std.time.sleep(50000000); // 50ms
        }
    }
    
    return AsyncError.ServiceUnavailable;
}

pub fn main() !void {
    std.debug.print("异步错误处理示例\n");
    
    const operations = [_]u32{ 1, 2, 3, 4, 5 };
    var frames: [operations.len]@Frame(operationWithRetry) = undefined;
    
    // 启动所有操作
    for (operations, 0..) |id, i| {
        frames[i] = async operationWithRetry(id, 3);
    }
    
    // 收集结果
    var successful_operations: u32 = 0;
    var failed_operations: u32 = 0;
    
    for (frames, 0..) |*frame, i| {
        if (await frame) |result| {
            std.debug.print("✅ 操作 {} 最终成功: {s}\n", .{ operations[i], result });
            successful_operations += 1;
        } else |err| {
            std.debug.print("❌ 操作 {} 最终失败: {}\n", .{ operations[i], err });
            failed_operations += 1;
        }
    }
    
    std.debug.print("\n统计结果:\n");
    std.debug.print("成功: {} 个操作\n", .{successful_operations});
    std.debug.print("失败: {} 个操作\n", .{failed_operations});
}

异步编程最佳实践

1. 合理使用 suspend 和 resume

zig
const std = @import("std");

// 协作式多任务
async fn cooperativeTask(id: u32, work_units: u32) void {
    std.debug.print("协作任务 {} 开始 ({} 个工作单元)\n", .{ id, work_units });
    
    for (0..work_units) |i| {
        // 执行一个工作单元
        std.debug.print("任务 {} 执行工作单元 {}\n", .{ id, i + 1 });
        
        // 模拟工作
        std.time.sleep(50000000); // 50ms
        
        // 主动让出控制权,允许其他任务执行
        if ((i + 1) % 2 == 0) {
            suspend {}
        }
    }
    
    std.debug.print("协作任务 {} 完成\n", .{id});
}

pub fn main() void {
    std.debug.print("协作式多任务示例\n");
    
    var task1 = async cooperativeTask(1, 6);
    var task2 = async cooperativeTask(2, 4);
    var task3 = async cooperativeTask(3, 5);
    
    // 等待所有任务完成
    await task1;
    await task2;
    await task3;
    
    std.debug.print("所有协作任务完成\n");
}

2. 异步资源管理

zig
const std = @import("std");

const AsyncResource = struct {
    id: u32,
    is_acquired: bool,
    
    const Self = @This();
    
    pub fn init(id: u32) Self {
        return Self{
            .id = id,
            .is_acquired = false,
        };
    }
    
    pub async fn acquire(self: *Self) !void {
        std.debug.print("尝试获取资源 {}\n", .{self.id});
        
        // 模拟资源获取延迟
        std.time.sleep(100000000); // 100ms
        
        if (self.is_acquired) {
            return error.ResourceBusy;
        }
        
        self.is_acquired = true;
        std.debug.print("资源 {} 获取成功\n", .{self.id});
    }
    
    pub fn release(self: *Self) void {
        if (self.is_acquired) {
            self.is_acquired = false;
            std.debug.print("资源 {} 已释放\n", .{self.id});
        }
    }
};

async fn useResource(resource: *AsyncResource, user_id: u32) !void {
    std.debug.print("用户 {} 请求使用资源 {}\n", .{ user_id, resource.id });
    
    // 获取资源
    try await resource.acquire();
    defer resource.release(); // 确保资源被释放
    
    // 使用资源
    std.debug.print("用户 {} 正在使用资源 {}\n", .{ user_id, resource.id });
    std.time.sleep(200000000); // 200ms
    
    std.debug.print("用户 {} 使用资源 {} 完成\n", .{ user_id, resource.id });
}

pub fn main() !void {
    std.debug.print("异步资源管理示例\n");
    
    var resource = AsyncResource.init(1);
    
    var user1_frame = async useResource(&resource, 1);
    var user2_frame = async useResource(&resource, 2);
    
    // 等待所有用户完成
    try await user1_frame;
    try await user2_frame;
    
    std.debug.print("资源管理示例完成\n");
}

总结

本章介绍了 Zig 异步编程的基础知识:

  • ✅ async/await 语法和协程概念
  • ✅ 异步 I/O 操作
  • ✅ 异步网络编程基础
  • ✅ 任务调度和管理
  • ✅ 异步错误处理
  • ✅ 最佳实践和资源管理

Zig 的异步编程模型提供了强大而灵活的并发处理能力,适合构建高性能的异步应用程序。需要注意的是,Zig 的异步功能仍在发展中,某些 API 可能会在未来版本中发生变化。

在下一章中,我们将学习 Zig 与 C 语言的交互。

本站内容仅供学习和研究使用。