Ruby 多线程
多线程编程是现代应用程序提高性能和响应性的重要技术。Ruby提供了丰富的线程支持,允许开发者创建并发程序来执行多个任务。虽然Ruby的全局解释器锁(GIL)限制了真正的并行执行,但在I/O密集型任务中,线程仍然能显著提高程序性能。本章将详细介绍Ruby中的多线程编程,包括线程创建、同步、通信和最佳实践。
🎯 线程基础
什么是线程
线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。一个进程可以包含多个线程,这些线程共享进程的内存空间和资源。
Ruby线程特点
- 用户级线程: Ruby线程由Ruby解释器管理,不是操作系统线程
- 全局解释器锁(GIL): MRI Ruby中存在GIL,限制了CPU密集型任务的并行执行
- I/O并发: 在I/O操作期间,线程可以并发执行
- 轻量级: Ruby线程比操作系统线程更轻量
🧵 线程创建和管理
基本线程创建
ruby
# 创建简单线程
thread = Thread.new do
puts "线程开始执行"
sleep(2)
puts "线程执行完成"
end
puts "主线程继续执行"
thread.join # 等待线程完成
puts "所有线程完成"线程生命周期
ruby
# 线程状态管理
def demonstrate_thread_lifecycle
puts "创建线程"
thread = Thread.new do
puts "线程运行中..."
sleep(3)
puts "线程完成"
end
puts "线程状态: #{thread.status}" # run
puts "线程是否存活: #{thread.alive?}" # true
sleep(1)
puts "主线程等待..."
thread.join
puts "线程状态: #{thread.status}" # false
puts "线程是否存活: #{thread.alive?}" # false
end
demonstrate_thread_lifecycle线程优先级和控制
ruby
# 线程优先级设置
def thread_priority_example
threads = []
# 创建低优先级线程
low_priority = Thread.new do
Thread.current.priority = -1
5.times do |i|
puts "低优先级线程: #{i}"
sleep(0.5)
end
end
threads << low_priority
# 创建高优先级线程
high_priority = Thread.new do
Thread.current.priority = 1
5.times do |i|
puts "高优先级线程: #{i}"
sleep(0.5)
end
end
threads << high_priority
# 等待所有线程完成
threads.each(&:join)
end
# thread_priority_example
# 线程控制
def thread_control_example
thread = Thread.new do
loop do
puts "线程运行中..."
sleep(1)
end
end
puts "线程已启动"
sleep(3)
puts "终止线程"
thread.kill
puts "线程已终止: #{thread.alive?}" # false
end
# thread_control_example🔒 线程同步
互斥锁(Mutex)
ruby
# 使用Mutex保护共享资源
require 'thread'
class Counter
def initialize
@count = 0
@mutex = Mutex.new
end
def increment
@mutex.synchronize do
@count += 1
end
end
def decrement
@mutex.synchronize do
@count -= 1
end
end
def value
@mutex.synchronize do
@count
end
end
end
# 演示线程安全计数器
def safe_counter_example
counter = Counter.new
threads = []
# 创建多个线程同时修改计数器
10.times do
threads << Thread.new do
1000.times do
counter.increment
end
end
end
# 等待所有线程完成
threads.each(&:join)
puts "期望值: 10000"
puts "实际值: #{counter.value}"
end
safe_counter_example条件变量
ruby
# 使用条件变量进行线程间通信
require 'thread'
class ProducerConsumer
def initialize
@queue = []
@mutex = Mutex.new
@condition = ConditionVariable.new
@max_size = 5
end
def produce(item)
@mutex.synchronize do
# 等待队列有空间
while @queue.size >= @max_size
puts "生产者等待..."
@condition.wait(@mutex)
end
@queue << item
puts "生产: #{item} (队列大小: #{@queue.size})"
# 通知消费者
@condition.signal
end
end
def consume
@mutex.synchronize do
# 等待队列有数据
while @queue.empty?
puts "消费者等待..."
@condition.wait(@mutex)
end
item = @queue.shift
puts "消费: #{item} (队列大小: #{@queue.size})"
# 通知生产者
@condition.signal
item
end
end
end
# 演示生产者消费者模式
def producer_consumer_example
pc = ProducerConsumer.new
threads = []
# 生产者线程
threads << Thread.new do
10.times do |i|
pc.produce("商品#{i}")
sleep(0.5)
end
end
# 消费者线程
threads << Thread.new do
10.times do
item = pc.consume
sleep(1)
end
end
threads.each(&:join)
end
# producer_consumer_example信号量
ruby
# 自定义信号量实现
class Semaphore
def initialize(count)
@count = count
@mutex = Mutex.new
@condition = ConditionVariable.new
end
def acquire
@mutex.synchronize do
while @count <= 0
@condition.wait(@mutex)
end
@count -= 1
end
end
def release
@mutex.synchronize do
@count += 1
@condition.signal
end
end
end
# 使用信号量限制并发访问
def semaphore_example
semaphore = Semaphore.new(3) # 最多3个线程同时访问
threads = []
10.times do |i|
threads << Thread.new do
semaphore.acquire
begin
puts "线程#{i}获得访问权限"
sleep(2) # 模拟工作
puts "线程#{i}释放访问权限"
ensure
semaphore.release
end
end
end
threads.each(&:join)
end
# semaphore_example🔄 线程间通信
使用队列通信
ruby
# 使用线程安全的队列
require 'thread'
def queue_communication_example
queue = Queue.new
threads = []
# 生产者线程
threads << Thread.new do
5.times do |i|
message = "消息#{i}"
queue << message
puts "发送: #{message}"
sleep(0.5)
end
# 发送结束信号
queue << :done
end
# 消费者线程
threads << Thread.new do
loop do
message = queue.pop
break if message == :done
puts "接收: #{message}"
sleep(0.3)
end
puts "消费者结束"
end
threads.each(&:join)
end
queue_communication_example使用 SizedQueue 限制队列大小
ruby
# 有大小限制的队列
require 'thread'
def sized_queue_example
queue = SizedQueue.new(3) # 最多容纳3个元素
threads = []
# 快速生产者
threads << Thread.new do
10.times do |i|
item = "数据#{i}"
puts "尝试放入: #{item}"
queue << item # 当队列满时会阻塞
puts "成功放入: #{item}"
end
end
# 慢速消费者
threads << Thread.new do
10.times do
sleep(1) # 模拟处理时间
item = queue.pop
puts "处理: #{item}"
end
end
threads.each(&:join)
end
# sized_queue_example线程局部变量
ruby
# 线程局部变量
def thread_local_example
# 设置线程局部变量
Thread.current[:user_id] = 12345
Thread.current[:session_id] = 'abc123'
threads = []
3.times do |i|
threads << Thread.new do
# 每个线程都有自己的局部变量
Thread.current[:thread_id] = i
Thread.current[:user_id] = rand(10000)
sleep(0.5)
puts "线程#{i}:"
puts " 用户ID: #{Thread.current[:user_id]}"
puts " 线程ID: #{Thread.current[:thread_id]}"
puts " 会话ID: #{Thread.current[:session_id] || '未设置'}"
puts
end
end
threads.each(&:join)
# 主线程的变量
puts "主线程:"
puts " 用户ID: #{Thread.current[:user_id]}"
puts " 会话ID: #{Thread.current[:session_id]}"
end
thread_local_example🎯 实用多线程模式
工作池模式
ruby
# 工作池模式
require 'thread'
class WorkerPool
def initialize(size)
@size = size
@jobs = Queue.new
@workers = []
@mutex = Mutex.new
@shutdown = false
end
def start
@size.times do |i|
@workers << create_worker(i)
end
end
def schedule(&block)
@mutex.synchronize do
raise "工作池已关闭" if @shutdown
@jobs << block
end
end
def shutdown
@mutex.synchronize do
@shutdown = true
@size.times do
@jobs << nil # 发送结束信号
end
end
@workers.each(&:join)
end
private
def create_worker(id)
Thread.new do
loop do
job = @jobs.pop
break if job.nil?
begin
job.call
rescue => e
puts "工作线程#{id}错误: #{e.message}"
end
end
puts "工作线程#{id}结束"
end
end
end
# 使用工作池
def worker_pool_example
pool = WorkerPool.new(3)
pool.start
# 提交任务
10.times do |i|
pool.schedule do
puts "执行任务#{i}"
sleep(rand(1..3))
puts "任务#{i}完成"
end
end
# 等待一段时间后关闭
sleep(5)
pool.shutdown
end
# worker_pool_exampleFuture模式
ruby
# Future模式实现
class Future
def initialize(&block)
@mutex = Mutex.new
@condition = ConditionVariable.new
@result = nil
@error = nil
@completed = false
# 在新线程中执行任务
Thread.new do
begin
@result = block.call
rescue => e
@error = e
ensure
@mutex.synchronize do
@completed = true
@condition.broadcast
end
end
end
end
def result
@mutex.synchronize do
until @completed
@condition.wait(@mutex)
end
raise @error if @error
@result
end
end
def completed?
@mutex.synchronize { @completed }
end
end
# 使用Future
def future_example
# 创建Future执行耗时任务
future = Future.new do
puts "开始耗时计算..."
sleep(3)
42 # 计算结果
end
puts "Future已创建,继续其他工作..."
# 做其他事情
3.times do |i|
puts "其他工作#{i}"
sleep(0.5)
end
# 获取结果(会阻塞直到完成)
result = future.result
puts "计算结果: #{result}"
end
# future_example纤程(Fiber)
ruby
# 纤程示例
def fiber_example
# 创建纤程
fiber = Fiber.new do
puts "纤程开始"
Fiber.yield 1
puts "纤程继续"
Fiber.yield 2
puts "纤程结束"
3
end
puts "主程序"
puts "第一次恢复: #{fiber.resume}"
puts "主程序继续"
puts "第二次恢复: #{fiber.resume}"
puts "主程序继续"
puts "第三次恢复: #{fiber.resume}"
puts "主程序结束"
end
# fiber_example
# 使用纤程实现生成器
class NumberGenerator
def initialize
@fiber = Fiber.new do
num = 1
loop do
Fiber.yield num
num += 1
end
end
end
def next
@fiber.resume
end
end
def generator_example
gen = NumberGenerator.new
10.times do
puts "生成的数字: #{gen.next}"
end
end
# generator_example📊 多线程性能优化
I/O密集型任务并发
ruby
# I/O密集型任务并发处理
require 'net/http'
require 'uri'
require 'benchmark'
# 模拟URL列表
URLS = [
'http://httpbin.org/delay/1',
'http://httpbin.org/delay/2',
'http://httpbin.org/delay/1',
'http://httpbin.org/delay/3',
'http://httpbin.org/delay/1'
]
# 串行执行
def sequential_requests
results = []
URLS.each do |url|
uri = URI(url)
response = Net::HTTP.get_response(uri)
results << response.code
end
results
end
# 并行执行
def parallel_requests
threads = []
results = Queue.new
URLS.each do |url|
threads << Thread.new do
uri = URI(url)
response = Net::HTTP.get_response(uri)
results << response.code
end
end
threads.each(&:join)
# 收集结果
result_array = []
results.size.times { result_array << results.pop }
result_array
end
# 性能比较
def performance_comparison
puts "=== 性能比较 ==="
time = Benchmark.measure do
results = sequential_requests
puts "串行执行结果: #{results}"
end
puts "串行执行时间: #{time.real.round(2)}秒"
puts
time = Benchmark.measure do
results = parallel_requests
puts "并行执行结果: #{results}"
end
puts "并行执行时间: #{time.real.round(2)}秒"
end
# performance_comparison线程池优化
ruby
# 高效线程池实现
require 'thread'
class ThreadPool
def initialize(min_threads = 2, max_threads = 10)
@min_threads = min_threads
@max_threads = max_threads
@jobs = Queue.new
@workers = []
@mutex = Mutex.new
@shutdown = false
@active_workers = 0
# 初始化最小线程数
@min_threads.times { add_worker }
end
def schedule(&block)
@mutex.synchronize do
raise "线程池已关闭" if @shutdown
# 如果队列中有等待任务且未达到最大线程数,则添加新线程
if @jobs.size > 0 && @workers.size < @max_threads
add_worker
end
@jobs << block
end
end
def shutdown
@mutex.synchronize do
@shutdown = true
# 发送结束信号给所有工作线程
@workers.size.times { @jobs << nil }
end
@workers.each(&:join)
end
private
def add_worker
worker_id = @workers.size
@workers << Thread.new do
loop do
job = @jobs.pop
break if job.nil?
@mutex.synchronize { @active_workers += 1 }
begin
job.call
rescue => e
puts "工作线程#{worker_id}错误: #{e.message}"
ensure
@mutex.synchronize { @active_workers -= 1 }
end
end
end
end
end
# 使用线程池处理任务
def thread_pool_example
pool = ThreadPool.new(3, 5)
# 提交大量任务
20.times do |i|
pool.schedule do
puts "执行任务#{i} (线程: #{Thread.current.object_id})"
sleep(rand(1..2))
puts "任务#{i}完成"
end
end
# 等待一段时间后关闭
sleep(10)
pool.shutdown
end
# thread_pool_example🔍 线程调试和监控
线程状态监控
ruby
# 线程监控工具
class ThreadMonitor
def self.list_threads
puts "=== 当前线程列表 ==="
Thread.list.each_with_index do |thread, index|
puts "#{index + 1}. 线程ID: #{thread.object_id}"
puts " 状态: #{thread.status}"
puts " 是否存活: #{thread.alive?}"
puts " 优先级: #{thread.priority}"
puts " 组: #{thread.group}"
puts " 键: #{thread.keys}" unless thread.keys.empty?
puts
end
end
def self.thread_stats
threads = Thread.list
alive_count = threads.count(&:alive?)
sleeping_count = threads.count { |t| t.status == 'sleep' }
{
total: threads.length,
alive: alive_count,
dead: threads.length - alive_count,
sleeping: sleeping_count
}
end
def self.kill_all_except_main
Thread.list.each do |thread|
next if thread == Thread.main
thread.kill if thread.alive?
end
end
end
# 使用线程监控
def thread_monitoring_example
threads = []
3.times do |i|
threads << Thread.new do
puts "线程#{i}开始"
sleep(5)
puts "线程#{i}结束"
end
end
# 显示线程信息
ThreadMonitor.list_threads
puts "线程统计: #{ThreadMonitor.thread_stats}"
sleep(2)
threads.each(&:join)
end
# thread_monitoring_example死锁检测
ruby
# 死锁检测示例
require 'thread'
class DeadlockDetector
def initialize
@mutexes = {}
@mutex = Mutex.new
end
def register_mutex(mutex, name)
@mutex.synchronize do
@mutexes[mutex.object_id] = {
name: name,
owner: nil,
waiters: []
}
end
end
def acquire(mutex, timeout = 5)
# 简化的死锁检测逻辑
mutex.lock
end
end
# 安全的互斥锁包装器
class SafeMutex
def initialize(name)
@mutex = Mutex.new
@name = name
end
def synchronize
start_time = Time.now
@mutex.lock
yield
ensure
@mutex.unlock
end
end🎯 多线程最佳实践
1. 避免共享状态
ruby
# 不好的做法:共享可变状态
@shared_counter = 0
@shared_array = []
# 好的做法:使用线程安全的数据结构
require 'thread'
@mutex = Mutex.new
@thread_safe_array = Queue.new
# 或者使用不可变数据
class ImmutableData
attr_reader :data
def initialize(data)
@data = data.freeze
end
def update(new_data)
self.class.new(new_data)
end
end2. 正确处理异常
ruby
# 线程异常处理
def safe_thread_example
thread = Thread.new do
begin
# 可能出错的代码
raise "模拟错误"
rescue => e
puts "线程内捕获异常: #{e.message}"
# 记录日志或处理错误
ensure
puts "线程清理工作"
end
end
# 设置线程异常处理器
thread.abort_on_exception = false # 防止异常终止整个程序
thread.join
# 检查线程是否正常结束
if thread.status.nil?
puts "线程正常结束"
elsif thread.status == false
puts "线程异常终止"
end
end
# safe_thread_example3. 资源管理
ruby
# 线程安全的资源管理
class ResourceManager
def initialize
@resources = Queue.new
@mutex = Mutex.new
@created_count = 0
end
def acquire
resource = @resources.pop(true) rescue nil
if resource.nil?
@mutex.synchronize do
if @created_count < 10 # 最大资源数
resource = create_resource
@created_count += 1
end
end
end
resource
end
def release(resource)
@resources << resource
end
private
def create_resource
# 创建资源的逻辑
Object.new
end
end
# 使用资源管理器
def resource_management_example
manager = ResourceManager.new
threads = []
15.times do |i|
threads << Thread.new do
resource = manager.acquire
if resource
puts "线程#{i}获得资源"
sleep(rand(1..3))
manager.release(resource)
puts "线程#{i}释放资源"
else
puts "线程#{i}无法获得资源"
end
end
end
threads.each(&:join)
end
# resource_management_example4. 测试多线程代码
ruby
# 多线程代码测试示例
require 'test/unit'
class ThreadSafeCounter
def initialize
@count = 0
@mutex = Mutex.new
end
def increment
@mutex.synchronize { @count += 1 }
end
def value
@mutex.synchronize { @count }
end
end
class TestThreadSafeCounter < Test::Unit::TestCase
def test_concurrent_increment
counter = ThreadSafeCounter.new
threads = []
# 创建多个线程同时增加计数器
10.times do
threads << Thread.new do
100.times { counter.increment }
end
end
# 等待所有线程完成
threads.each(&:join)
# 验证结果
assert_equal 1000, counter.value
end
def test_thread_safety_under_load
counter = ThreadSafeCounter.new
threads = []
# 大量并发操作
50.times do
threads << Thread.new do
rand(50..100).times { counter.increment }
end
end
threads.each(&:join)
# 结果应该在预期范围内
assert counter.value > 0
assert counter.value <= 5000
end
end📚 下一步学习
掌握了Ruby多线程编程后,建议继续学习:
- Ruby JSON - 学习JSON数据处理
- Ruby 参考手册及学习资源 - 获取更多学习资料
- Ruby 数据库访问 - 学习数据库操作
- Ruby Web服务 - 深入Web开发
继续您的Ruby学习之旅吧!