Skip to content

Go 语言协程(Goroutine)

Goroutine 是 Go 语言并发编程的核心,它是一种轻量级的执行线程。结合 channels,goroutines 构成了 Go 独特的并发模型。

📋 Goroutine 基础

创建和启动 Goroutine

go
package main

import (
    "fmt"
    "runtime"
    "time"
)

// 普通函数
func sayHello(name string, times int) {
    for i := 0; i < times; i++ {
        fmt.Printf("Hello %s! (%d)\n", name, i+1)
        time.Sleep(100 * time.Millisecond)
    }
}

func demonstrateBasicGoroutines() {
    fmt.Println("=== 基本 Goroutine 演示 ===")
    
    fmt.Printf("开始时 Goroutines 数量: %d\n", runtime.NumGoroutine())
    
    // 1. 普通函数调用
    fmt.Println("顺序调用:")
    sayHello("Alice", 2)
    
    // 2. 作为 goroutine 调用
    fmt.Println("\n作为 Goroutine 调用:")
    go sayHello("Bob", 3)
    go sayHello("Charlie", 2)
    
    // 3. 匿名函数 goroutine
    go func(name string) {
        for i := 0; i < 2; i++ {
            fmt.Printf("匿名 goroutine: %s (%d)\n", name, i+1)
            time.Sleep(150 * time.Millisecond)
        }
    }("David")
    
    fmt.Printf("启动后 Goroutines 数量: %d\n", runtime.NumGoroutine())
    
    // 等待 goroutines 完成
    time.Sleep(800 * time.Millisecond)
    
    fmt.Printf("结束时 Goroutines 数量: %d\n", runtime.NumGoroutine())
}

func main() {
    demonstrateBasicGoroutines()
}

📡 Channel(通道)

基本通道操作

go
package main

import (
    "fmt"
    "time"
)

// 基本通道使用
func basicChannelDemo() {
    fmt.Println("=== 基本通道演示 ===")
    
    // 创建无缓冲通道
    ch := make(chan string)
    
    // 发送方 goroutine
    go func() {
        messages := []string{"第一条消息", "第二条消息", "第三条消息"}
        
        for _, msg := range messages {
            fmt.Printf("发送: %s\n", msg)
            ch <- msg // 发送数据到通道
            time.Sleep(100 * time.Millisecond)
        }
        
        close(ch) // 关闭通道
    }()
    
    // 接收方(主 goroutine)
    fmt.Println("开始接收消息:")
    for msg := range ch { // 从通道接收数据直到关闭
        fmt.Printf("接收: %s\n", msg)
        time.Sleep(50 * time.Millisecond)
    }
    
    fmt.Println("通道已关闭,接收完成")
}

// 缓冲通道 vs 无缓冲通道
func bufferedVsUnbuffered() {
    fmt.Println("\n=== 缓冲通道 vs 无缓冲通道 ===")
    
    // 无缓冲通道(同步)
    fmt.Println("无缓冲通道:")
    unbuffered := make(chan int)
    
    go func() {
        for i := 1; i <= 3; i++ {
            fmt.Printf("发送到无缓冲通道: %d\n", i)
            unbuffered <- i
            fmt.Printf("发送完成: %d\n", i)
        }
        close(unbuffered)
    }()
    
    time.Sleep(200 * time.Millisecond) // 稍等片刻
    
    for value := range unbuffered {
        fmt.Printf("从无缓冲通道接收: %d\n", value)
        time.Sleep(100 * time.Millisecond)
    }
    
    // 缓冲通道(异步)
    fmt.Println("\n缓冲通道:")
    buffered := make(chan int, 3) // 缓冲区大小为 3
    
    go func() {
        for i := 1; i <= 3; i++ {
            fmt.Printf("发送到缓冲通道: %d\n", i)
            buffered <- i
            fmt.Printf("发送完成: %d\n", i)
        }
        close(buffered)
    }()
    
    time.Sleep(200 * time.Millisecond) // 稍等片刻
    
    for value := range buffered {
        fmt.Printf("从缓冲通道接收: %d\n", value)
        time.Sleep(100 * time.Millisecond)
    }
}

func main() {
    basicChannelDemo()
    bufferedVsUnbuffered()
}

Select 语句

go
package main

import (
    "fmt"
    "time"
)

// 基本 select 使用
func basicSelect() {
    fmt.Println("=== 基本 Select 演示 ===")
    
    ch1 := make(chan string)
    ch2 := make(chan string)
    
    // 第一个 goroutine
    go func() {
        time.Sleep(100 * time.Millisecond)
        ch1 <- "来自通道1的数据"
    }()
    
    // 第二个 goroutine
    go func() {
        time.Sleep(200 * time.Millisecond)
        ch2 <- "来自通道2的数据"
    }()
    
    // 使用 select 等待多个通道
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Printf("收到: %s\n", msg1)
        case msg2 := <-ch2:
            fmt.Printf("收到: %s\n", msg2)
        }
    }
}

// 超时处理
func timeoutHandling() {
    fmt.Println("\n=== 超时处理 ===")
    
    ch := make(chan string)
    
    // 模拟慢操作
    go func() {
        time.Sleep(300 * time.Millisecond)
        ch <- "慢操作完成"
    }()
    
    // 带超时的接收
    select {
    case msg := <-ch:
        fmt.Printf("及时收到: %s\n", msg)
    case <-time.After(200 * time.Millisecond):
        fmt.Println("操作超时")
    }
    
    // 等待实际完成
    time.Sleep(200 * time.Millisecond)
    select {
    case msg := <-ch:
        fmt.Printf("最终收到: %s\n", msg)
    default:
        fmt.Println("没有数据")
    }
}

// 非阻塞操作
func nonBlockingOperations() {
    fmt.Println("\n=== 非阻塞操作 ===")
    
    ch := make(chan string, 1)
    
    // 非阻塞发送
    select {
    case ch <- "可以发送":
        fmt.Println("成功发送到通道")
    default:
        fmt.Println("通道忙碌,无法发送")
    }
    
    // 非阻塞接收
    select {
    case msg := <-ch:
        fmt.Printf("接收到: %s\n", msg)
    default:
        fmt.Println("通道为空,无法接收")
    }
}

func main() {
    basicSelect()
    timeoutHandling()
    nonBlockingOperations()
}

🔄 并发模式

工作池模式

go
package main

import (
    "fmt"
    "sync"
    "time"
)

// 任务结构
type Task struct {
    ID   int
    Data string
}

// 工作池演示
func workerPoolDemo() {
    fmt.Println("=== 工作池模式演示 ===")
    
    const numWorkers = 3
    const numJobs = 10
    
    jobs := make(chan Task, numJobs)
    results := make(chan string, numJobs)
    
    var wg sync.WaitGroup
    
    // 启动工作者
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            
            for task := range jobs {
                fmt.Printf("Worker %d 开始处理任务 %d\n", id, task.ID)
                
                // 模拟工作
                time.Sleep(100 * time.Millisecond)
                
                result := fmt.Sprintf("Worker %d 完成任务 %d: %s", id, task.ID, task.Data)
                results <- result
                
                fmt.Printf("Worker %d 完成任务 %d\n", id, task.ID)
            }
        }(w)
    }
    
    // 发送任务
    go func() {
        for j := 1; j <= numJobs; j++ {
            task := Task{
                ID:   j,
                Data: fmt.Sprintf("数据_%d", j),
            }
            jobs <- task
        }
        close(jobs)
    }()
    
    // 等待所有工作者完成
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // 收集结果
    fmt.Println("收集结果:")
    for result := range results {
        fmt.Printf("  %s\n", result)
    }
}

func main() {
    workerPoolDemo()
}

生产者-消费者模式

go
package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// 生产者-消费者演示
func producerConsumerDemo() {
    fmt.Println("=== 生产者-消费者模式 ===")
    
    const bufferSize = 3
    const numProducers = 2
    const numConsumers = 2
    
    queue := make(chan int, bufferSize)
    var wg sync.WaitGroup
    
    // 启动生产者
    for i := 0; i < numProducers; i++ {
        wg.Add(1)
        go func(producerID int) {
            defer wg.Done()
            
            for j := 0; j < 5; j++ {
                item := producerID*10 + j
                fmt.Printf("🏭 生产者 %d 生产: %d\n", producerID, item)
                queue <- item
                time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
            }
        }(i)
    }
    
    // 启动消费者
    for i := 0; i < numConsumers; i++ {
        wg.Add(1)
        go func(consumerID int) {
            defer wg.Done()
            
            for item := range queue {
                fmt.Printf("🔧 消费者 %d 消费: %d\n", consumerID, item)
                time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
            }
        }(i)
    }
    
    // 等待所有生产者完成后关闭队列
    go func() {
        wg.Wait()
        close(queue)
    }()
    
    // 等待所有消费者完成
    wg.Wait()
    fmt.Println("所有任务完成")
}

func main() {
    rand.Seed(time.Now().UnixNano())
    producerConsumerDemo()
}

🎯 Context(上下文)

Context 基础使用

go
package main

import (
    "context"
    "fmt"
    "time"
)

// 使用 context 进行取消
func contextCancellation() {
    fmt.Println("=== Context 取消演示 ===")
    
    // 创建可取消的上下文
    ctx, cancel := context.WithCancel(context.Background())
    
    // 启动工作 goroutine
    go func() {
        for {
            select {
            case <-ctx.Done():
                fmt.Println("🛑 工作被取消:", ctx.Err())
                return
            default:
                fmt.Println("🔧 正在工作...")
                time.Sleep(200 * time.Millisecond)
            }
        }
    }()
    
    // 运行一段时间后取消
    time.Sleep(600 * time.Millisecond)
    fmt.Println("📢 发送取消信号")
    cancel()
    
    time.Sleep(100 * time.Millisecond)
}

// 使用 context 进行超时控制
func contextTimeout() {
    fmt.Println("\n=== Context 超时演示 ===")
    
    // 创建带超时的上下文
    ctx, cancel := context.WithTimeout(context.Background(), 400*time.Millisecond)
    defer cancel()
    
    // 启动工作 goroutine
    result := make(chan string, 1)
    
    go func() {
        // 模拟耗时操作
        time.Sleep(600 * time.Millisecond)
        result <- "工作完成"
    }()
    
    // 等待结果或超时
    select {
    case <-ctx.Done():
        fmt.Println("⏰ 操作超时:", ctx.Err())
    case res := <-result:
        fmt.Println("✅", res)
    }
}

// 使用 context 传递值
func contextValues() {
    fmt.Println("\n=== Context 传递值演示 ===")
    
    // 创建带值的上下文
    ctx := context.WithValue(context.Background(), "userID", "user123")
    ctx = context.WithValue(ctx, "requestID", "req456")
    
    // 模拟处理请求
    processRequest(ctx)
}

func processRequest(ctx context.Context) {
    userID := ctx.Value("userID")
    requestID := ctx.Value("requestID")
    
    fmt.Printf("🔍 处理请求 - 用户ID: %v, 请求ID: %v\n", userID, requestID)
    
    // 继续传递给其他函数
    fetchUserData(ctx)
}

func fetchUserData(ctx context.Context) {
    userID := ctx.Value("userID")
    fmt.Printf("📊 获取用户 %v 的数据\n", userID)
    
    // 模拟数据库查询
    time.Sleep(100 * time.Millisecond)
    fmt.Printf("✅ 用户 %v 数据获取完成\n", userID)
}

func main() {
    contextCancellation()
    contextTimeout()
    contextValues()
}

🛠️ 实际应用示例

并发 Web 爬虫

go
package main

import (
    "fmt"
    "sync"
    "time"
)

// URL 结构
type URL struct {
    Address string
    Depth   int
}

// 爬虫结果
type CrawlResult struct {
    URL     string
    Content string
    Links   []string
    Error   error
}

// 模拟爬取网页
func crawlPage(url string) CrawlResult {
    // 模拟网络延迟
    time.Sleep(time.Duration(100+len(url)*5) * time.Millisecond)
    
    if url == "http://error.com" {
        return CrawlResult{
            URL:   url,
            Error: fmt.Errorf("网络错误"),
        }
    }
    
    // 模拟网页内容和链接
    content := fmt.Sprintf("网页内容:%s 的内容", url)
    links := []string{
        url + "/page1",
        url + "/page2",
    }
    
    return CrawlResult{
        URL:     url,
        Content: content,
        Links:   links,
    }
}

// 并发爬虫
func concurrentCrawler() {
    fmt.Println("=== 并发网络爬虫演示 ===")
    
    const maxWorkers = 3
    const maxDepth = 2
    
    urlQueue := make(chan URL, 10)
    results := make(chan CrawlResult, 10)
    
    var wg sync.WaitGroup
    visited := make(map[string]bool)
    var visitedMux sync.Mutex
    
    // 启动工作者
    for i := 0; i < maxWorkers; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            
            for url := range urlQueue {
                fmt.Printf("🕷️ 工作者 %d 爬取: %s (深度: %d)\n", 
                          workerID, url.Address, url.Depth)
                
                result := crawlPage(url.Address)
                results <- result
                
                // 如果成功且未达到最大深度,添加子链接到队列
                if result.Error == nil && url.Depth < maxDepth {
                    for _, link := range result.Links {
                        visitedMux.Lock()
                        if !visited[link] {
                            visited[link] = true
                            urlQueue <- URL{Address: link, Depth: url.Depth + 1}
                        }
                        visitedMux.Unlock()
                    }
                }
            }
        }(i)
    }
    
    // 初始 URLs
    startURLs := []string{
        "http://example.com",
        "http://test.com",
        "http://error.com",
    }
    
    // 添加初始 URLs
    go func() {
        for _, startURL := range startURLs {
            visitedMux.Lock()
            visited[startURL] = true
            visitedMux.Unlock()
            
            urlQueue <- URL{Address: startURL, Depth: 0}
        }
        
        // 等待一段时间让爬虫工作
        time.Sleep(2 * time.Second)
        close(urlQueue)
    }()
    
    // 收集结果
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // 处理结果
    fmt.Println("\n📊 爬取结果:")
    successCount := 0
    errorCount := 0
    
    for result := range results {
        if result.Error != nil {
            fmt.Printf("❌ 错误: %s - %v\n", result.URL, result.Error)
            errorCount++
        } else {
            fmt.Printf("✅ 成功: %s - 找到 %d 个链接\n", 
                      result.URL, len(result.Links))
            successCount++
        }
    }
    
    fmt.Printf("\n📈 统计: 成功 %d, 失败 %d\n", successCount, errorCount)
}

func main() {
    concurrentCrawler()
}

🎓 小结

本章我们深入学习了 Go 语言的协程(Goroutine):

  • Goroutine 基础:创建、启动和管理轻量级线程
  • Channel 通信:无缓冲和缓冲通道的使用
  • Select 语句:多路复用和超时处理
  • 并发模式:工作池、生产者-消费者模式
  • Context 上下文:取消、超时和值传递
  • 实际应用:并发网络爬虫实现

Goroutine 和 Channel 是 Go 并发编程的核心,掌握它们是构建高性能 Go 应用的关键。


接下来,我们将学习 Go 文件处理,了解文件和目录操作的完整指南。

Goroutine 使用建议

  • 合理控制 Goroutine 数量,避免创建过多
  • 使用 Channel 进行通信,避免共享内存
  • 利用 Context 管理 Goroutine 生命周期
  • 注意 Goroutine 泄露,确保正确退出

本站内容仅供学习和研究使用。