Go 语言协程(Goroutine)
Goroutine 是 Go 语言并发编程的核心,它是一种轻量级的执行线程。结合 channels,goroutines 构成了 Go 独特的并发模型。
📋 Goroutine 基础
创建和启动 Goroutine
go
package main
import (
"fmt"
"runtime"
"time"
)
// 普通函数
func sayHello(name string, times int) {
for i := 0; i < times; i++ {
fmt.Printf("Hello %s! (%d)\n", name, i+1)
time.Sleep(100 * time.Millisecond)
}
}
func demonstrateBasicGoroutines() {
fmt.Println("=== 基本 Goroutine 演示 ===")
fmt.Printf("开始时 Goroutines 数量: %d\n", runtime.NumGoroutine())
// 1. 普通函数调用
fmt.Println("顺序调用:")
sayHello("Alice", 2)
// 2. 作为 goroutine 调用
fmt.Println("\n作为 Goroutine 调用:")
go sayHello("Bob", 3)
go sayHello("Charlie", 2)
// 3. 匿名函数 goroutine
go func(name string) {
for i := 0; i < 2; i++ {
fmt.Printf("匿名 goroutine: %s (%d)\n", name, i+1)
time.Sleep(150 * time.Millisecond)
}
}("David")
fmt.Printf("启动后 Goroutines 数量: %d\n", runtime.NumGoroutine())
// 等待 goroutines 完成
time.Sleep(800 * time.Millisecond)
fmt.Printf("结束时 Goroutines 数量: %d\n", runtime.NumGoroutine())
}
func main() {
demonstrateBasicGoroutines()
}📡 Channel(通道)
基本通道操作
go
package main
import (
"fmt"
"time"
)
// 基本通道使用
func basicChannelDemo() {
fmt.Println("=== 基本通道演示 ===")
// 创建无缓冲通道
ch := make(chan string)
// 发送方 goroutine
go func() {
messages := []string{"第一条消息", "第二条消息", "第三条消息"}
for _, msg := range messages {
fmt.Printf("发送: %s\n", msg)
ch <- msg // 发送数据到通道
time.Sleep(100 * time.Millisecond)
}
close(ch) // 关闭通道
}()
// 接收方(主 goroutine)
fmt.Println("开始接收消息:")
for msg := range ch { // 从通道接收数据直到关闭
fmt.Printf("接收: %s\n", msg)
time.Sleep(50 * time.Millisecond)
}
fmt.Println("通道已关闭,接收完成")
}
// 缓冲通道 vs 无缓冲通道
func bufferedVsUnbuffered() {
fmt.Println("\n=== 缓冲通道 vs 无缓冲通道 ===")
// 无缓冲通道(同步)
fmt.Println("无缓冲通道:")
unbuffered := make(chan int)
go func() {
for i := 1; i <= 3; i++ {
fmt.Printf("发送到无缓冲通道: %d\n", i)
unbuffered <- i
fmt.Printf("发送完成: %d\n", i)
}
close(unbuffered)
}()
time.Sleep(200 * time.Millisecond) // 稍等片刻
for value := range unbuffered {
fmt.Printf("从无缓冲通道接收: %d\n", value)
time.Sleep(100 * time.Millisecond)
}
// 缓冲通道(异步)
fmt.Println("\n缓冲通道:")
buffered := make(chan int, 3) // 缓冲区大小为 3
go func() {
for i := 1; i <= 3; i++ {
fmt.Printf("发送到缓冲通道: %d\n", i)
buffered <- i
fmt.Printf("发送完成: %d\n", i)
}
close(buffered)
}()
time.Sleep(200 * time.Millisecond) // 稍等片刻
for value := range buffered {
fmt.Printf("从缓冲通道接收: %d\n", value)
time.Sleep(100 * time.Millisecond)
}
}
func main() {
basicChannelDemo()
bufferedVsUnbuffered()
}Select 语句
go
package main
import (
"fmt"
"time"
)
// 基本 select 使用
func basicSelect() {
fmt.Println("=== 基本 Select 演示 ===")
ch1 := make(chan string)
ch2 := make(chan string)
// 第一个 goroutine
go func() {
time.Sleep(100 * time.Millisecond)
ch1 <- "来自通道1的数据"
}()
// 第二个 goroutine
go func() {
time.Sleep(200 * time.Millisecond)
ch2 <- "来自通道2的数据"
}()
// 使用 select 等待多个通道
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Printf("收到: %s\n", msg1)
case msg2 := <-ch2:
fmt.Printf("收到: %s\n", msg2)
}
}
}
// 超时处理
func timeoutHandling() {
fmt.Println("\n=== 超时处理 ===")
ch := make(chan string)
// 模拟慢操作
go func() {
time.Sleep(300 * time.Millisecond)
ch <- "慢操作完成"
}()
// 带超时的接收
select {
case msg := <-ch:
fmt.Printf("及时收到: %s\n", msg)
case <-time.After(200 * time.Millisecond):
fmt.Println("操作超时")
}
// 等待实际完成
time.Sleep(200 * time.Millisecond)
select {
case msg := <-ch:
fmt.Printf("最终收到: %s\n", msg)
default:
fmt.Println("没有数据")
}
}
// 非阻塞操作
func nonBlockingOperations() {
fmt.Println("\n=== 非阻塞操作 ===")
ch := make(chan string, 1)
// 非阻塞发送
select {
case ch <- "可以发送":
fmt.Println("成功发送到通道")
default:
fmt.Println("通道忙碌,无法发送")
}
// 非阻塞接收
select {
case msg := <-ch:
fmt.Printf("接收到: %s\n", msg)
default:
fmt.Println("通道为空,无法接收")
}
}
func main() {
basicSelect()
timeoutHandling()
nonBlockingOperations()
}🔄 并发模式
工作池模式
go
package main
import (
"fmt"
"sync"
"time"
)
// 任务结构
type Task struct {
ID int
Data string
}
// 工作池演示
func workerPoolDemo() {
fmt.Println("=== 工作池模式演示 ===")
const numWorkers = 3
const numJobs = 10
jobs := make(chan Task, numJobs)
results := make(chan string, numJobs)
var wg sync.WaitGroup
// 启动工作者
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for task := range jobs {
fmt.Printf("Worker %d 开始处理任务 %d\n", id, task.ID)
// 模拟工作
time.Sleep(100 * time.Millisecond)
result := fmt.Sprintf("Worker %d 完成任务 %d: %s", id, task.ID, task.Data)
results <- result
fmt.Printf("Worker %d 完成任务 %d\n", id, task.ID)
}
}(w)
}
// 发送任务
go func() {
for j := 1; j <= numJobs; j++ {
task := Task{
ID: j,
Data: fmt.Sprintf("数据_%d", j),
}
jobs <- task
}
close(jobs)
}()
// 等待所有工作者完成
go func() {
wg.Wait()
close(results)
}()
// 收集结果
fmt.Println("收集结果:")
for result := range results {
fmt.Printf(" %s\n", result)
}
}
func main() {
workerPoolDemo()
}生产者-消费者模式
go
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// 生产者-消费者演示
func producerConsumerDemo() {
fmt.Println("=== 生产者-消费者模式 ===")
const bufferSize = 3
const numProducers = 2
const numConsumers = 2
queue := make(chan int, bufferSize)
var wg sync.WaitGroup
// 启动生产者
for i := 0; i < numProducers; i++ {
wg.Add(1)
go func(producerID int) {
defer wg.Done()
for j := 0; j < 5; j++ {
item := producerID*10 + j
fmt.Printf("🏭 生产者 %d 生产: %d\n", producerID, item)
queue <- item
time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
}
}(i)
}
// 启动消费者
for i := 0; i < numConsumers; i++ {
wg.Add(1)
go func(consumerID int) {
defer wg.Done()
for item := range queue {
fmt.Printf("🔧 消费者 %d 消费: %d\n", consumerID, item)
time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
}
}(i)
}
// 等待所有生产者完成后关闭队列
go func() {
wg.Wait()
close(queue)
}()
// 等待所有消费者完成
wg.Wait()
fmt.Println("所有任务完成")
}
func main() {
rand.Seed(time.Now().UnixNano())
producerConsumerDemo()
}🎯 Context(上下文)
Context 基础使用
go
package main
import (
"context"
"fmt"
"time"
)
// 使用 context 进行取消
func contextCancellation() {
fmt.Println("=== Context 取消演示 ===")
// 创建可取消的上下文
ctx, cancel := context.WithCancel(context.Background())
// 启动工作 goroutine
go func() {
for {
select {
case <-ctx.Done():
fmt.Println("🛑 工作被取消:", ctx.Err())
return
default:
fmt.Println("🔧 正在工作...")
time.Sleep(200 * time.Millisecond)
}
}
}()
// 运行一段时间后取消
time.Sleep(600 * time.Millisecond)
fmt.Println("📢 发送取消信号")
cancel()
time.Sleep(100 * time.Millisecond)
}
// 使用 context 进行超时控制
func contextTimeout() {
fmt.Println("\n=== Context 超时演示 ===")
// 创建带超时的上下文
ctx, cancel := context.WithTimeout(context.Background(), 400*time.Millisecond)
defer cancel()
// 启动工作 goroutine
result := make(chan string, 1)
go func() {
// 模拟耗时操作
time.Sleep(600 * time.Millisecond)
result <- "工作完成"
}()
// 等待结果或超时
select {
case <-ctx.Done():
fmt.Println("⏰ 操作超时:", ctx.Err())
case res := <-result:
fmt.Println("✅", res)
}
}
// 使用 context 传递值
func contextValues() {
fmt.Println("\n=== Context 传递值演示 ===")
// 创建带值的上下文
ctx := context.WithValue(context.Background(), "userID", "user123")
ctx = context.WithValue(ctx, "requestID", "req456")
// 模拟处理请求
processRequest(ctx)
}
func processRequest(ctx context.Context) {
userID := ctx.Value("userID")
requestID := ctx.Value("requestID")
fmt.Printf("🔍 处理请求 - 用户ID: %v, 请求ID: %v\n", userID, requestID)
// 继续传递给其他函数
fetchUserData(ctx)
}
func fetchUserData(ctx context.Context) {
userID := ctx.Value("userID")
fmt.Printf("📊 获取用户 %v 的数据\n", userID)
// 模拟数据库查询
time.Sleep(100 * time.Millisecond)
fmt.Printf("✅ 用户 %v 数据获取完成\n", userID)
}
func main() {
contextCancellation()
contextTimeout()
contextValues()
}🛠️ 实际应用示例
并发 Web 爬虫
go
package main
import (
"fmt"
"sync"
"time"
)
// URL 结构
type URL struct {
Address string
Depth int
}
// 爬虫结果
type CrawlResult struct {
URL string
Content string
Links []string
Error error
}
// 模拟爬取网页
func crawlPage(url string) CrawlResult {
// 模拟网络延迟
time.Sleep(time.Duration(100+len(url)*5) * time.Millisecond)
if url == "http://error.com" {
return CrawlResult{
URL: url,
Error: fmt.Errorf("网络错误"),
}
}
// 模拟网页内容和链接
content := fmt.Sprintf("网页内容:%s 的内容", url)
links := []string{
url + "/page1",
url + "/page2",
}
return CrawlResult{
URL: url,
Content: content,
Links: links,
}
}
// 并发爬虫
func concurrentCrawler() {
fmt.Println("=== 并发网络爬虫演示 ===")
const maxWorkers = 3
const maxDepth = 2
urlQueue := make(chan URL, 10)
results := make(chan CrawlResult, 10)
var wg sync.WaitGroup
visited := make(map[string]bool)
var visitedMux sync.Mutex
// 启动工作者
for i := 0; i < maxWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for url := range urlQueue {
fmt.Printf("🕷️ 工作者 %d 爬取: %s (深度: %d)\n",
workerID, url.Address, url.Depth)
result := crawlPage(url.Address)
results <- result
// 如果成功且未达到最大深度,添加子链接到队列
if result.Error == nil && url.Depth < maxDepth {
for _, link := range result.Links {
visitedMux.Lock()
if !visited[link] {
visited[link] = true
urlQueue <- URL{Address: link, Depth: url.Depth + 1}
}
visitedMux.Unlock()
}
}
}
}(i)
}
// 初始 URLs
startURLs := []string{
"http://example.com",
"http://test.com",
"http://error.com",
}
// 添加初始 URLs
go func() {
for _, startURL := range startURLs {
visitedMux.Lock()
visited[startURL] = true
visitedMux.Unlock()
urlQueue <- URL{Address: startURL, Depth: 0}
}
// 等待一段时间让爬虫工作
time.Sleep(2 * time.Second)
close(urlQueue)
}()
// 收集结果
go func() {
wg.Wait()
close(results)
}()
// 处理结果
fmt.Println("\n📊 爬取结果:")
successCount := 0
errorCount := 0
for result := range results {
if result.Error != nil {
fmt.Printf("❌ 错误: %s - %v\n", result.URL, result.Error)
errorCount++
} else {
fmt.Printf("✅ 成功: %s - 找到 %d 个链接\n",
result.URL, len(result.Links))
successCount++
}
}
fmt.Printf("\n📈 统计: 成功 %d, 失败 %d\n", successCount, errorCount)
}
func main() {
concurrentCrawler()
}🎓 小结
本章我们深入学习了 Go 语言的协程(Goroutine):
- ✅ Goroutine 基础:创建、启动和管理轻量级线程
- ✅ Channel 通信:无缓冲和缓冲通道的使用
- ✅ Select 语句:多路复用和超时处理
- ✅ 并发模式:工作池、生产者-消费者模式
- ✅ Context 上下文:取消、超时和值传递
- ✅ 实际应用:并发网络爬虫实现
Goroutine 和 Channel 是 Go 并发编程的核心,掌握它们是构建高性能 Go 应用的关键。
接下来,我们将学习 Go 文件处理,了解文件和目录操作的完整指南。
Goroutine 使用建议
- 合理控制 Goroutine 数量,避免创建过多
- 使用 Channel 进行通信,避免共享内存
- 利用 Context 管理 Goroutine 生命周期
- 注意 Goroutine 泄露,确保正确退出