开发者

Threading in Ruby with a limit

I have a task I need to perform, do_stuff(opts), that will take ~1s each, even while 1 - 10 of them are running in parallel. I need to collect an array of the results for each operation at the end.

If I have 30 stuffs to do, how would I use threading effectively to queue up the do_stuff(opts) operations so no more than 10 are running concurrently, but the array of results is not given/printed/etc until all (30) tasks have been completed?

I usually have at least some code to try and illustrate what I mean, but with threading I'开发者_JAVA技巧m at a bit of a loss! Thanks in advance


I dunno how well it'll work for a more complex application, but I found something like this to work nicely for a simple threading scenario with macruby.

thread_limit = 4

threads = []
things_to_process.each do |thing|
  until threads.map { |t| t.status }.count("run") < thread_limit do sleep 5 end
  threads << Thread.new { the_task(thing) }
end
output = threads.map { |t| t.value }

the until loop waits around until there are less than the specified number of created threads running before allowing execution of the main thread to continue on to start the next thread.

the output variable will be assigned an array of the values returned by the_task with an ordering corresponding to the input array things_to_process. The main thread will block until every created thread returns a value.


You need to implement this pattern
This question discusses how that can be done in Ruby


If you're really after performance, you might also want to look into jruby.
It uses actual OS threads and not the green threads the other ruby implementations use


This solution gathers results in $results array. It allows 'thread_limit' threads to be created, then waits on them to complete before creating any more.

   $results = []

   def do_stuff(opts={})
     'done'
   end

   def thread_wait(threads)
     threads.each{|t| t.join}
     threads.each {|t| $results << t }
     threads.delete_if {|t| t.status == false}
     threads.delete_if {|t| t.status.nil? }
   end

   opts = {}
   thread_limit = 20
   threads = []
   records.each do |r|
     thread_wait(threads) while threads.length >= thread_limit
     t = Thread.new { do_stuff(opts) }
     t.abort_on_exception = true
     threads << t
   end
   # Ensure remaining threads complete
   threads.each{|t| t.join}


Also, take a look at this tutorial if you are new to Ruby threads.


I use parals and paralsmap:

def parals(objects, n: 50)
  objects.shuffle.each_slice(n).map do |g|
    print '{'
    threads = []
    g.map { |i| threads << Thread.new(i) { |i| yield(i) } }
    threads.each(&:join)
    print '}'
  end
end

def paralsmap(objects, n: 50)
  res = []

  objects.each_slice(n).map do |g|
    print '{'
    threads = []
    g.map { |i| threads << Thread.new(i, res) { |i| res << yield(i) } }
    threads.each(&:join)
    print '}'
  end

  res
end

e.g.:

parals((0..100).to_a) { |i| puts i }
urls = parals((0..100).to_a) { |i| "https://google.com/?q=#{i}" }

You can use the n parameter to limit the number of threads.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜