Skip to content

Ruby 多线程

多线程编程是现代应用程序提高性能和响应性的重要技术。Ruby提供了丰富的线程支持,允许开发者创建并发程序来执行多个任务。虽然Ruby的全局解释器锁(GIL)限制了真正的并行执行,但在I/O密集型任务中,线程仍然能显著提高程序性能。本章将详细介绍Ruby中的多线程编程,包括线程创建、同步、通信和最佳实践。

🎯 线程基础

什么是线程

线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。一个进程可以包含多个线程,这些线程共享进程的内存空间和资源。

Ruby线程特点

  1. 用户级线程: Ruby线程由Ruby解释器管理,不是操作系统线程
  2. 全局解释器锁(GIL): MRI Ruby中存在GIL,限制了CPU密集型任务的并行执行
  3. I/O并发: 在I/O操作期间,线程可以并发执行
  4. 轻量级: Ruby线程比操作系统线程更轻量

🧵 线程创建和管理

基本线程创建

ruby
# 创建简单线程
thread = Thread.new do
  puts "线程开始执行"
  sleep(2)
  puts "线程执行完成"
end

puts "主线程继续执行"
thread.join  # 等待线程完成
puts "所有线程完成"

线程生命周期

ruby
# 线程状态管理
def demonstrate_thread_lifecycle
  puts "创建线程"
  thread = Thread.new do
    puts "线程运行中..."
    sleep(3)
    puts "线程完成"
  end
  
  puts "线程状态: #{thread.status}"  # run
  puts "线程是否存活: #{thread.alive?}"  # true
  
  sleep(1)
  puts "主线程等待..."
  
  thread.join
  puts "线程状态: #{thread.status}"  # false
  puts "线程是否存活: #{thread.alive?}"  # false
end

demonstrate_thread_lifecycle

线程优先级和控制

ruby
# 线程优先级设置
def thread_priority_example
  threads = []
  
  # 创建低优先级线程
  low_priority = Thread.new do
    Thread.current.priority = -1
    5.times do |i|
      puts "低优先级线程: #{i}"
      sleep(0.5)
    end
  end
  threads << low_priority
  
  # 创建高优先级线程
  high_priority = Thread.new do
    Thread.current.priority = 1
    5.times do |i|
      puts "高优先级线程: #{i}"
      sleep(0.5)
    end
  end
  threads << high_priority
  
  # 等待所有线程完成
  threads.each(&:join)
end

# thread_priority_example

# 线程控制
def thread_control_example
  thread = Thread.new do
    loop do
      puts "线程运行中..."
      sleep(1)
    end
  end
  
  puts "线程已启动"
  sleep(3)
  
  puts "终止线程"
  thread.kill
  puts "线程已终止: #{thread.alive?}"  # false
end

# thread_control_example

🔒 线程同步

互斥锁(Mutex)

ruby
# 使用Mutex保护共享资源
require 'thread'

class Counter
  def initialize
    @count = 0
    @mutex = Mutex.new
  end
  
  def increment
    @mutex.synchronize do
      @count += 1
    end
  end
  
  def decrement
    @mutex.synchronize do
      @count -= 1
    end
  end
  
  def value
    @mutex.synchronize do
      @count
    end
  end
end

# 演示线程安全计数器
def safe_counter_example
  counter = Counter.new
  threads = []
  
  # 创建多个线程同时修改计数器
  10.times do
    threads << Thread.new do
      1000.times do
        counter.increment
      end
    end
  end
  
  # 等待所有线程完成
  threads.each(&:join)
  
  puts "期望值: 10000"
  puts "实际值: #{counter.value}"
end

safe_counter_example

条件变量

ruby
# 使用条件变量进行线程间通信
require 'thread'

class ProducerConsumer
  def initialize
    @queue = []
    @mutex = Mutex.new
    @condition = ConditionVariable.new
    @max_size = 5
  end
  
  def produce(item)
    @mutex.synchronize do
      # 等待队列有空间
      while @queue.size >= @max_size
        puts "生产者等待..."
        @condition.wait(@mutex)
      end
      
      @queue << item
      puts "生产: #{item} (队列大小: #{@queue.size})"
      
      # 通知消费者
      @condition.signal
    end
  end
  
  def consume
    @mutex.synchronize do
      # 等待队列有数据
      while @queue.empty?
        puts "消费者等待..."
        @condition.wait(@mutex)
      end
      
      item = @queue.shift
      puts "消费: #{item} (队列大小: #{@queue.size})"
      
      # 通知生产者
      @condition.signal
      
      item
    end
  end
end

# 演示生产者消费者模式
def producer_consumer_example
  pc = ProducerConsumer.new
  threads = []
  
  # 生产者线程
  threads << Thread.new do
    10.times do |i|
      pc.produce("商品#{i}")
      sleep(0.5)
    end
  end
  
  # 消费者线程
  threads << Thread.new do
    10.times do
      item = pc.consume
      sleep(1)
    end
  end
  
  threads.each(&:join)
end

# producer_consumer_example

信号量

ruby
# 自定义信号量实现
class Semaphore
  def initialize(count)
    @count = count
    @mutex = Mutex.new
    @condition = ConditionVariable.new
  end
  
  def acquire
    @mutex.synchronize do
      while @count <= 0
        @condition.wait(@mutex)
      end
      @count -= 1
    end
  end
  
  def release
    @mutex.synchronize do
      @count += 1
      @condition.signal
    end
  end
end

# 使用信号量限制并发访问
def semaphore_example
  semaphore = Semaphore.new(3)  # 最多3个线程同时访问
  threads = []
  
  10.times do |i|
    threads << Thread.new do
      semaphore.acquire
      begin
        puts "线程#{i}获得访问权限"
        sleep(2)  # 模拟工作
        puts "线程#{i}释放访问权限"
      ensure
        semaphore.release
      end
    end
  end
  
  threads.each(&:join)
end

# semaphore_example

🔄 线程间通信

使用队列通信

ruby
# 使用线程安全的队列
require 'thread'

def queue_communication_example
  queue = Queue.new
  threads = []
  
  # 生产者线程
  threads << Thread.new do
    5.times do |i|
      message = "消息#{i}"
      queue << message
      puts "发送: #{message}"
      sleep(0.5)
    end
    
    # 发送结束信号
    queue << :done
  end
  
  # 消费者线程
  threads << Thread.new do
    loop do
      message = queue.pop
      break if message == :done
      
      puts "接收: #{message}"
      sleep(0.3)
    end
    
    puts "消费者结束"
  end
  
  threads.each(&:join)
end

queue_communication_example

使用 SizedQueue 限制队列大小

ruby
# 有大小限制的队列
require 'thread'

def sized_queue_example
  queue = SizedQueue.new(3)  # 最多容纳3个元素
  threads = []
  
  # 快速生产者
  threads << Thread.new do
    10.times do |i|
      item = "数据#{i}"
      puts "尝试放入: #{item}"
      queue << item  # 当队列满时会阻塞
      puts "成功放入: #{item}"
    end
  end
  
  # 慢速消费者
  threads << Thread.new do
    10.times do
      sleep(1)  # 模拟处理时间
      item = queue.pop
      puts "处理: #{item}"
    end
  end
  
  threads.each(&:join)
end

# sized_queue_example

线程局部变量

ruby
# 线程局部变量
def thread_local_example
  # 设置线程局部变量
  Thread.current[:user_id] = 12345
  Thread.current[:session_id] = 'abc123'
  
  threads = []
  
  3.times do |i|
    threads << Thread.new do
      # 每个线程都有自己的局部变量
      Thread.current[:thread_id] = i
      Thread.current[:user_id] = rand(10000)
      
      sleep(0.5)
      
      puts "线程#{i}:"
      puts "  用户ID: #{Thread.current[:user_id]}"
      puts "  线程ID: #{Thread.current[:thread_id]}"
      puts "  会话ID: #{Thread.current[:session_id] || '未设置'}"
      puts
    end
  end
  
  threads.each(&:join)
  
  # 主线程的变量
  puts "主线程:"
  puts "  用户ID: #{Thread.current[:user_id]}"
  puts "  会话ID: #{Thread.current[:session_id]}"
end

thread_local_example

🎯 实用多线程模式

工作池模式

ruby
# 工作池模式
require 'thread'

class WorkerPool
  def initialize(size)
    @size = size
    @jobs = Queue.new
    @workers = []
    @mutex = Mutex.new
    @shutdown = false
  end
  
  def start
    @size.times do |i|
      @workers << create_worker(i)
    end
  end
  
  def schedule(&block)
    @mutex.synchronize do
      raise "工作池已关闭" if @shutdown
      @jobs << block
    end
  end
  
  def shutdown
    @mutex.synchronize do
      @shutdown = true
      @size.times do
        @jobs << nil  # 发送结束信号
      end
    end
    
    @workers.each(&:join)
  end
  
  private
  
  def create_worker(id)
    Thread.new do
      loop do
        job = @jobs.pop
        break if job.nil?
        
        begin
          job.call
        rescue => e
          puts "工作线程#{id}错误: #{e.message}"
        end
      end
      
      puts "工作线程#{id}结束"
    end
  end
end

# 使用工作池
def worker_pool_example
  pool = WorkerPool.new(3)
  pool.start
  
  # 提交任务
  10.times do |i|
    pool.schedule do
      puts "执行任务#{i}"
      sleep(rand(1..3))
      puts "任务#{i}完成"
    end
  end
  
  # 等待一段时间后关闭
  sleep(5)
  pool.shutdown
end

# worker_pool_example

Future模式

ruby
# Future模式实现
class Future
  def initialize(&block)
    @mutex = Mutex.new
    @condition = ConditionVariable.new
    @result = nil
    @error = nil
    @completed = false
    
    # 在新线程中执行任务
    Thread.new do
      begin
        @result = block.call
      rescue => e
        @error = e
      ensure
        @mutex.synchronize do
          @completed = true
          @condition.broadcast
        end
      end
    end
  end
  
  def result
    @mutex.synchronize do
      until @completed
        @condition.wait(@mutex)
      end
      
      raise @error if @error
      @result
    end
  end
  
  def completed?
    @mutex.synchronize { @completed }
  end
end

# 使用Future
def future_example
  # 创建Future执行耗时任务
  future = Future.new do
    puts "开始耗时计算..."
    sleep(3)
    42  # 计算结果
  end
  
  puts "Future已创建,继续其他工作..."
  
  # 做其他事情
  3.times do |i|
    puts "其他工作#{i}"
    sleep(0.5)
  end
  
  # 获取结果(会阻塞直到完成)
  result = future.result
  puts "计算结果: #{result}"
end

# future_example

纤程(Fiber)

ruby
# 纤程示例
def fiber_example
  # 创建纤程
  fiber = Fiber.new do
    puts "纤程开始"
    Fiber.yield 1
    puts "纤程继续"
    Fiber.yield 2
    puts "纤程结束"
    3
  end
  
  puts "主程序"
  puts "第一次恢复: #{fiber.resume}"
  puts "主程序继续"
  puts "第二次恢复: #{fiber.resume}"
  puts "主程序继续"
  puts "第三次恢复: #{fiber.resume}"
  puts "主程序结束"
end

# fiber_example

# 使用纤程实现生成器
class NumberGenerator
  def initialize
    @fiber = Fiber.new do
      num = 1
      loop do
        Fiber.yield num
        num += 1
      end
    end
  end
  
  def next
    @fiber.resume
  end
end

def generator_example
  gen = NumberGenerator.new
  
  10.times do
    puts "生成的数字: #{gen.next}"
  end
end

# generator_example

📊 多线程性能优化

I/O密集型任务并发

ruby
# I/O密集型任务并发处理
require 'net/http'
require 'uri'
require 'benchmark'

# 模拟URL列表
URLS = [
  'http://httpbin.org/delay/1',
  'http://httpbin.org/delay/2',
  'http://httpbin.org/delay/1',
  'http://httpbin.org/delay/3',
  'http://httpbin.org/delay/1'
]

# 串行执行
def sequential_requests
  results = []
  URLS.each do |url|
    uri = URI(url)
    response = Net::HTTP.get_response(uri)
    results << response.code
  end
  results
end

# 并行执行
def parallel_requests
  threads = []
  results = Queue.new
  
  URLS.each do |url|
    threads << Thread.new do
      uri = URI(url)
      response = Net::HTTP.get_response(uri)
      results << response.code
    end
  end
  
  threads.each(&:join)
  
  # 收集结果
  result_array = []
  results.size.times { result_array << results.pop }
  result_array
end

# 性能比较
def performance_comparison
  puts "=== 性能比较 ==="
  
  time = Benchmark.measure do
    results = sequential_requests
    puts "串行执行结果: #{results}"
  end
  puts "串行执行时间: #{time.real.round(2)}秒"
  
  puts
  
  time = Benchmark.measure do
    results = parallel_requests
    puts "并行执行结果: #{results}"
  end
  puts "并行执行时间: #{time.real.round(2)}秒"
end

# performance_comparison

线程池优化

ruby
# 高效线程池实现
require 'thread'

class ThreadPool
  def initialize(min_threads = 2, max_threads = 10)
    @min_threads = min_threads
    @max_threads = max_threads
    @jobs = Queue.new
    @workers = []
    @mutex = Mutex.new
    @shutdown = false
    @active_workers = 0
    
    # 初始化最小线程数
    @min_threads.times { add_worker }
  end
  
  def schedule(&block)
    @mutex.synchronize do
      raise "线程池已关闭" if @shutdown
      
      # 如果队列中有等待任务且未达到最大线程数,则添加新线程
      if @jobs.size > 0 && @workers.size < @max_threads
        add_worker
      end
      
      @jobs << block
    end
  end
  
  def shutdown
    @mutex.synchronize do
      @shutdown = true
      # 发送结束信号给所有工作线程
      @workers.size.times { @jobs << nil }
    end
    
    @workers.each(&:join)
  end
  
  private
  
  def add_worker
    worker_id = @workers.size
    @workers << Thread.new do
      loop do
        job = @jobs.pop
        break if job.nil?
        
        @mutex.synchronize { @active_workers += 1 }
        begin
          job.call
        rescue => e
          puts "工作线程#{worker_id}错误: #{e.message}"
        ensure
          @mutex.synchronize { @active_workers -= 1 }
        end
      end
    end
  end
end

# 使用线程池处理任务
def thread_pool_example
  pool = ThreadPool.new(3, 5)
  
  # 提交大量任务
  20.times do |i|
    pool.schedule do
      puts "执行任务#{i} (线程: #{Thread.current.object_id})"
      sleep(rand(1..2))
      puts "任务#{i}完成"
    end
  end
  
  # 等待一段时间后关闭
  sleep(10)
  pool.shutdown
end

# thread_pool_example

🔍 线程调试和监控

线程状态监控

ruby
# 线程监控工具
class ThreadMonitor
  def self.list_threads
    puts "=== 当前线程列表 ==="
    Thread.list.each_with_index do |thread, index|
      puts "#{index + 1}. 线程ID: #{thread.object_id}"
      puts "   状态: #{thread.status}"
      puts "   是否存活: #{thread.alive?}"
      puts "   优先级: #{thread.priority}"
      puts "   组: #{thread.group}"
      puts "   键: #{thread.keys}" unless thread.keys.empty?
      puts
    end
  end
  
  def self.thread_stats
    threads = Thread.list
    alive_count = threads.count(&:alive?)
    sleeping_count = threads.count { |t| t.status == 'sleep' }
    
    {
      total: threads.length,
      alive: alive_count,
      dead: threads.length - alive_count,
      sleeping: sleeping_count
    }
  end
  
  def self.kill_all_except_main
    Thread.list.each do |thread|
      next if thread == Thread.main
      thread.kill if thread.alive?
    end
  end
end

# 使用线程监控
def thread_monitoring_example
  threads = []
  
  3.times do |i|
    threads << Thread.new do
      puts "线程#{i}开始"
      sleep(5)
      puts "线程#{i}结束"
    end
  end
  
  # 显示线程信息
  ThreadMonitor.list_threads
  puts "线程统计: #{ThreadMonitor.thread_stats}"
  
  sleep(2)
  threads.each(&:join)
end

# thread_monitoring_example

死锁检测

ruby
# 死锁检测示例
require 'thread'

class DeadlockDetector
  def initialize
    @mutexes = {}
    @mutex = Mutex.new
  end
  
  def register_mutex(mutex, name)
    @mutex.synchronize do
      @mutexes[mutex.object_id] = {
        name: name,
        owner: nil,
        waiters: []
      }
    end
  end
  
  def acquire(mutex, timeout = 5)
    # 简化的死锁检测逻辑
    mutex.lock
  end
end

# 安全的互斥锁包装器
class SafeMutex
  def initialize(name)
    @mutex = Mutex.new
    @name = name
  end
  
  def synchronize
    start_time = Time.now
    @mutex.lock
    yield
  ensure
    @mutex.unlock
  end
end

🎯 多线程最佳实践

1. 避免共享状态

ruby
# 不好的做法:共享可变状态
@shared_counter = 0
@shared_array = []

# 好的做法:使用线程安全的数据结构
require 'thread'
@mutex = Mutex.new
@thread_safe_array = Queue.new

# 或者使用不可变数据
class ImmutableData
  attr_reader :data
  
  def initialize(data)
    @data = data.freeze
  end
  
  def update(new_data)
    self.class.new(new_data)
  end
end

2. 正确处理异常

ruby
# 线程异常处理
def safe_thread_example
  thread = Thread.new do
    begin
      # 可能出错的代码
      raise "模拟错误"
    rescue => e
      puts "线程内捕获异常: #{e.message}"
      # 记录日志或处理错误
    ensure
      puts "线程清理工作"
    end
  end
  
  # 设置线程异常处理器
  thread.abort_on_exception = false  # 防止异常终止整个程序
  
  thread.join
  
  # 检查线程是否正常结束
  if thread.status.nil?
    puts "线程正常结束"
  elsif thread.status == false
    puts "线程异常终止"
  end
end

# safe_thread_example

3. 资源管理

ruby
# 线程安全的资源管理
class ResourceManager
  def initialize
    @resources = Queue.new
    @mutex = Mutex.new
    @created_count = 0
  end
  
  def acquire
    resource = @resources.pop(true) rescue nil
    
    if resource.nil?
      @mutex.synchronize do
        if @created_count < 10  # 最大资源数
          resource = create_resource
          @created_count += 1
        end
      end
    end
    
    resource
  end
  
  def release(resource)
    @resources << resource
  end
  
  private
  
  def create_resource
    # 创建资源的逻辑
    Object.new
  end
end

# 使用资源管理器
def resource_management_example
  manager = ResourceManager.new
  
  threads = []
  15.times do |i|
    threads << Thread.new do
      resource = manager.acquire
      if resource
        puts "线程#{i}获得资源"
        sleep(rand(1..3))
        manager.release(resource)
        puts "线程#{i}释放资源"
      else
        puts "线程#{i}无法获得资源"
      end
    end
  end
  
  threads.each(&:join)
end

# resource_management_example

4. 测试多线程代码

ruby
# 多线程代码测试示例
require 'test/unit'

class ThreadSafeCounter
  def initialize
    @count = 0
    @mutex = Mutex.new
  end
  
  def increment
    @mutex.synchronize { @count += 1 }
  end
  
  def value
    @mutex.synchronize { @count }
  end
end

class TestThreadSafeCounter < Test::Unit::TestCase
  def test_concurrent_increment
    counter = ThreadSafeCounter.new
    threads = []
    
    # 创建多个线程同时增加计数器
    10.times do
      threads << Thread.new do
        100.times { counter.increment }
      end
    end
    
    # 等待所有线程完成
    threads.each(&:join)
    
    # 验证结果
    assert_equal 1000, counter.value
  end
  
  def test_thread_safety_under_load
    counter = ThreadSafeCounter.new
    threads = []
    
    # 大量并发操作
    50.times do
      threads << Thread.new do
        rand(50..100).times { counter.increment }
      end
    end
    
    threads.each(&:join)
    
    # 结果应该在预期范围内
    assert counter.value > 0
    assert counter.value <= 5000
  end
end

📚 下一步学习

掌握了Ruby多线程编程后,建议继续学习:

继续您的Ruby学习之旅吧!

本站内容仅供学习和研究使用。