Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread pools behaving oddly with max_queue and remaining_capacity #684

Open
sudoremo opened this issue Dec 6, 2017 · 6 comments
Open

Thread pools behaving oddly with max_queue and remaining_capacity #684

sudoremo opened this issue Dec 6, 2017 · 6 comments
Labels
question An user question, does not change the library.

Comments

@sudoremo
Copy link

sudoremo commented Dec 6, 2017

We're in need for a thread pool that has a maximum number of parallel threads, no queue and fails if too many threads are added. We also need to be able to detect how many threads are "free" at a given time.

We've tried almost all of the ThreadPool and ThreadPoolExecutor classes but can't get it to work:

require 'concurrent'

pool = Concurrent::ThreadPoolExecutor.new(
  min_threads: 0,
  max_threads: 5,
  max_queue: 0,
  fallback_policy: :abort,
  auto_terminate: false
)

6.times do
  pool.post { sleep 5 }
  puts pool.remaining_capacity
end

pool.wait_for_termination

If we interpret the documentation right, this should create a thread pool with 0 threads at start that can grow up to 5 threads which are handled in parallel. If all 5 threads are occupied, posting new work units should result in an exception. There should be no additional queue. We'd also expect remaining_capacity to drop by 1 each time something has been posted and the previous threads are still busy.

The output of the above script:

-1
-1
-1
-1
-1
-1

Expected would be:

4
3
2
1
0
=> EXCEPTION

Did we interpret the documentation wrong and how can we achieve what we need? We're also need to be able to detect the number of idle threads so that we can post the right number of work units to it.

On a side-note: The above example stops with a number of exceptions like the following:

/usr/local/lib/ruby/gems/2.4.0/gems/concurrent-ruby-1.0.5/lib/concurrent/synchronization/mri_lockable_object.rb:43:in `sleep': No live threads left. Deadlock? (fatal)
6 threads, 6 sleeps current:0x00007fa6936e27d0 main thread:0x00007fa6937044c0
* #<Thread:0x00007fa69487f0b8 sleep_forever>
   rb_thread_t:0x00007fa6937044c0 native:0x00007fff7b53e000 int:1
   /usr/local/lib/ruby/gems/2.4.0/gems/concurrent-ruby-1.0.5/lib/concurrent/synchronization/mri_lockable_object.rb:43:in `sleep'
   /usr/local/lib/ruby/gems/2.4.0/gems/concurrent-ruby-1.0.5/lib/concurrent/synchronization/mri_lockable_object.rb:43:in `wait'
   /usr/local/lib/ruby/gems/2.4.0/gems/concurrent-ruby-1.0.5/lib/concurrent/synchronization/mri_lockable_object.rb:43:in `ns_wait'
   /usr/local/lib/ruby/gems/2.4.0/gems/concurrent-ruby-1.0.5/lib/concurrent/synchronization/abstract_lockable_object.rb:43:in `ns_wait_until'
   /usr/local/lib/ruby/gems/2.4.0/gems/concurrent-ruby-1.0.5/lib/concurrent/atomic/event.rb:87:in `block in wait'
   /usr/local/lib/ruby/gems/2.4.0/gems/concurrent-ruby-1.0.5/lib/concurrent/synchronization/mri_lockable_object.rb:38:in `block in synchronize'
   /usr/local/lib/ruby/gems/2.4.0/gems/concurrent-ruby-1.0.5/lib/concurrent/synchronization/mri_lockable_object.rb:38:in `synchronize'
   /usr/local/lib/ruby/gems/2.4.0/gems/concurrent-ruby-1.0.5/lib/concurrent/synchronization/mri_lockable_object.rb:38:in `synchronize'
   /usr/local/lib/ruby/gems/2.4.0/gems/concurrent-ruby-1.0.5/lib/concurrent/atomic/event.rb:84:in `wait'
   /usr/local/lib/ruby/gems/2.4.0/gems/concurrent-ruby-1.0.5/lib/concurrent/executor/ruby_executor_service.rb:49:in `wait_for_termination'
   concurrent_test.rb:16:in `<main>'

I have currently no idea what this is about. But this second problem does not happen in our application and is not a dealbreaker.

Thanks a bunch for having a look at the first question though - this would help us a lot. Keep up your wonderful work!


  • Operating system: mac
  • concurrent-ruby version: 1.0.5
  • concurrent-ruby-ext installed: no
  • concurrent-ruby-edge used: no
@sudoremo
Copy link
Author

sudoremo commented Dec 11, 2017

May I bump this question again? We'd really need a solution for this and are kind of stuck. Thanks a lot for having a quick look at this and sorry for bothering you again.

@pitr-ch pitr-ch added the question An user question, does not change the library. label Feb 21, 2018
@pitr-ch
Copy link
Member

pitr-ch commented Feb 21, 2018

Sorry, I did not have free time left and could not attend concurrent-ruby for a while. Unfortunately the thread-pool uses max_queue: 0 to make the queue unbounded. Remaining_capacity returns -1 which indicates that the capacity of the queue is unbounded, it's not related to number of free threads. The pool can be queried for the number of the threads but not for the number of idle threads.

What's you use case for this requirement, if you don't mind me asking? I may be able to suggest alternative approach.

@sudoremo
Copy link
Author

Thanks a lot for your answer. The reason for this requirement is the Gem workhorse. It has a (concurrent-ruby) queue of jobs and regularly polls the database for new jobs, only fetching as many jobs as the queue can still handle. So for instance, if we have a queue size of 5 and 3 jobs are active, the poller only fetches 2 jobs from the database.

To work around this issue, we've now created a wrapper around the ThreadPoolExecutor that does the job. If you feel it would make sense to include this in your Gem though we're more than happy to switch over to it.

@xsresearch
Copy link

@remofritzsche If you are looking to determine how many threads are available in the pool, you can apply the following patch to concurrent-ruby:

module Concurrent
  class RubyThreadPoolExecutor < RubyExecutorService
    def ready_worker_count
      synchronize do
        c1 = @max_length - @pool.length  # Number of workers still to be created
        c2 = @ready.length               # Workers created but waiting
        return c1+c2
      end
    end
  end
end

Then just replace pool.remaining_capacity with pool.ready_worker_count in your example (and set max_queue to -1 instead of 0) and you will achieve the desired behaviour.

@sudoremo
Copy link
Author

sudoremo commented Oct 8, 2018

Great, thank you. Are there any plans of adding this to the official source? As we're developing a Gem, we do not want to patch other Gems if we can work around it. Thanks :)

@bensheldon
Copy link
Contributor

I think the correct way to have no queue (e.g. run the fallback if all threads are active) is to use max_queue: 0, synchronous: true:

Concurrent::ThreadPoolExecutor.new(max_queue: 0, synchronous: true)`

You can see this behavior is asserted in the tests:

it 'executes fallback policy once max_threads has been reached' do

And this is the implementation in Ruby:

# tries to enqueue task
# @return [true, false] if enqueued
#
# @!visibility private
def ns_enqueue(*args, &task)
return false if @synchronous

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question An user question, does not change the library.
Projects
None yet
Development

No branches or pull requests

4 participants