ThreadPool中的死锁
我找不到适合Ruby的不错的ThreadPool实现,所以我写了我的(部分基于此处的代码:http://snippets.dzone.com/posts/show/3276,但是更改为wait / signal和其他实现ThreadPool关闭,但是经过一段时间的运行(拥有100个线程并处理了大约1300个任务),它死于第25行的死锁,等待在那里的新工作,有什么主意,为什么会发生?
require 'thread' begin require 'fastthread' rescue LoadError $stderr.puts "Using the ruby-core thread implementation" end class ThreadPool class Worker def initialize(callback) @mutex = Mutex.new @cv = ConditionVariable.new @callback = callback @mutex.synchronize {@running = true} @thread = Thread.new do while @mutex.synchronize {@running} block = get_block if block block.call reset_block # Signal the ThreadPool that this worker is ready for another job @callback.signal else # Wait for a new job @mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25? end end end end def name @thread.inspect end def get_block @mutex.synchronize {@block} end def set_block(block) @mutex.synchronize do raise RuntimeError, "Thread already busy." if @block @block = block # Signal the thread in this class, that there's a job to be done @cv.signal end end def reset_block @mutex.synchronize {@block = nil} end def busy? @mutex.synchronize {[email protected]?} end def stop @mutex.synchronize {@running = false} # Signal the thread not to wait for a new job @cv.signal @thread.join end end attr_accessor :max_size def initialize(max_size = 10) @max_size = max_size @workers = [] @mutex = Mutex.new @cv = ConditionVariable.new end def size @mutex.synchronize {@workers.size} end def busy? @mutex.synchronize {@workers.any? {|w| w.busy?}} end def shutdown @mutex.synchronize {@workers.each {|w| w.stop}} end alias :join :shutdown def process(block=nil,&blk) block = blk if block_given? while true @mutex.synchronize do worker = get_worker if worker return worker.set_block(block) else # Wait for a free worker @cv.wait(@mutex) end end end end # Used by workers to report ready status def signal @cv.signal end private def get_worker free_worker || create_worker end def free_worker @workers.each {|w| return w unless w.busy?}; nil end def create_worker return nil if @workers.size >= @max_size worker = Worker.new(self) @workers << worker worker end end
解决方案
回答
我在这里稍有偏见,但我建议我们使用某种流程语言对此模型进行建模并对其进行模型检查。免费提供的工具包括,例如mCRL2工具集(使用基于ACP的语言),移动工作台(pi演算)和Spin(PROMELA)。
否则,我建议删除对问题不重要的所有代码,并找出发生死锁的最小情况。我怀疑100个线程和1300个任务对于陷入僵局是否必不可少。在较小的情况下,我们可能只需添加一些调试打印即可提供足够的信息来解决问题。
回答
好的,问题似乎出在ThreadPool#signal方法中。可能发生的是:
1我们所有的工人都很忙,我们尝试处理新工作
2行90得到一名零工人
3工人被释放并发出信号,但由于ThreadPool不等待信号而丢失了信号
4我们落在第95行,即使有空闲工人,也要等待。
这里的错误是,即使没有人在听,我们也可以向自由工作者发出信号。此ThreadPool#signal方法应为:
def signal @mutex.synchronize { @cv.signal } end
这个问题在Worker对象中也是一样。可能发生的情况是:
1工人刚刚完成工作
2它检查(第17行)是否有等待的工作:没有
3线程池发送一个新作业并发出信号...但是信号丢失
4即使标记为忙碌,工作人员仍在等待信号
我们应该将initialize方法设置为:
def initialize(callback) @mutex = Mutex.new @cv = ConditionVariable.new @callback = callback @mutex.synchronize {@running = true} @thread = Thread.new do @mutex.synchronize do while @running block = get_block if block @mutex.unlock block.call @mutex.lock reset_block # Signal the ThreadPool that this worker is ready for another job @callback.signal else # Wait for a new job @cv.wait(@mutex) end end end end end
接下来,不应该再同步Worker#get_block和Worker#reset_block方法。这样,我们就无法在块测试和信号等待之间将块分配给工作人员。
回答
好的,因此实现的主要问题是:如何确保没有信号丢失并避免死锁?
以我的经验,使用条件变量和互斥锁确实很难做到这一点,但使用信号量则很容易。碰巧红宝石实现了一个名为Queue(或者SizedQueue)的对象,该对象应该可以解决该问题。这是我建议的实现:
require 'thread' begin require 'fasttread' rescue LoadError $stderr.puts "Using the ruby-core thread implementation" end class ThreadPool class Worker def initialize(thread_queue) @mutex = Mutex.new @cv = ConditionVariable.new @queue = thread_queue @running = true @thread = Thread.new do @mutex.synchronize do while @running @cv.wait(@mutex) block = get_block if block @mutex.unlock block.call @mutex.lock reset_block end @queue << self end end end end def name @thread.inspect end def get_block @block end def set_block(block) @mutex.synchronize do raise RuntimeError, "Thread already busy." if @block @block = block # Signal the thread in this class, that there's a job to be done @cv.signal end end def reset_block @block = nil end def busy? @mutex.synchronize { [email protected]? } end def stop @mutex.synchronize do @running = false @cv.signal end @thread.join end end attr_accessor :max_size def initialize(max_size = 10) @max_size = max_size @queue = Queue.new @workers = [] end def size @workers.size end def busy? @queue.size < @workers.size end def shutdown @workers.each { |w| w.stop } @workers = [] end alias :join :shutdown def process(block=nil,&blk) block = blk if block_given? worker = get_worker worker.set_block(block) end private def get_worker if [email protected]? or @workers.size == @max_size return @queue.pop else worker = Worker.new(@queue) @workers << worker worker end end end
这是一个简单的测试代码:
tp = ThreadPool.new 500 (1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } } tp.shutdown
回答
我们可以尝试work_queue gem,它用于协调生产者和工作线程池之间的工作。