Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker threads get stuck when using caller_runs policy #933

Closed
jonmast opened this issue Jan 13, 2022 · 4 comments · Fixed by #939
Closed

Worker threads get stuck when using caller_runs policy #933

jonmast opened this issue Jan 13, 2022 · 4 comments · Fixed by #939
Labels
bug A bug in the library or documentation. high-priority Should be done ASAP.

Comments

@jonmast
Copy link

jonmast commented Jan 13, 2022

I'm attempting to use a threadpool to implement bounded concurrency with the the caller_runs fallback_policy to add back pressure. This doesn't work as I'd expect, after the queue is full and the fallback policy is triggered subsequent jobs all get processed on the caller thread rather than being delegated to the workers.

Reproduction script:

require 'concurrent-ruby'

tasks = (1..10).to_a

threadpool = Concurrent::FixedThreadPool.new(
  1,
  max_queue: 2,
  fallback_policy: :caller_runs
)

tasks.each do |t|
  threadpool.post do
    puts "Task #{t} running on '#{Thread.current.name || 'main'}'"
    sleep 1
    puts "Task #{t} complete"
  end
end

threadpool.shutdown
threadpool.wait_for_termination

Output:

Task 4 running on 'main'
Task 1 running on 'worker-1'
Task 1 complete
Task 4 complete
Task 5 running on 'main'
Task 5 complete
Task 6 running on 'main'
Task 6 complete
Task 7 running on 'main'
Task 7 complete
Task 8 running on 'main'
Task 8 complete
Task 9 running on 'main'
Task 9 complete
Task 10 running on 'main'
Task 10 complete
Task 2 running on 'worker-1'
Task 2 complete
Task 3 running on 'worker-1'
Task 3 complete

I suspect this is because the threadpool remains locked while the caller task is running, preventing worker threads from continuing. Adding a sleep between posting each task to the pool unblocks it and allows worker threads to continue processing.

I don't know if this is considered a bug or not, but I found it very surprising and can't see an obvious reason it needs to work this way, I'd expect the worker threads to be able to pull items from the queue regardless of how long the caller task blocks.

* Operating system:                   linux and mac
* Ruby implementation:             Ruby 
* `concurrent-ruby` version:       1.1.9
* `concurrent-ruby-ext` installed:  no
* `concurrent-ruby-edge` used:    no
@chrisseaton
Copy link
Member

Yes looks like it's run while holding the lock.

synchronize do
# If the executor is shut down, reject this task
return handle_fallback(*args, &task) unless running?
ns_execute(*args, &task)
true
end

@chrisseaton chrisseaton added bug A bug in the library or documentation. high-priority Should be done ASAP. labels Mar 8, 2022
@chrisseaton
Copy link
Member

Task 4 running on 'main'
Task 1 running on 'worker-1'
Task 4 complete
Task 5 running on 'main'
Task 1 complete
Task 2 running on 'worker-1'
Task 5 complete
Task 7 running on 'main'
Task 2 complete
Task 3 running on 'worker-1'
Task 7 complete
Task 9 running on 'main'
Task 3 complete
Task 6 running on 'worker-1'
Task 9 complete
Task 6 complete
Task 8 running on 'worker-1'
Task 8 complete
Task 10 running on 'worker-1'
Task 10 complete

That's what we want, isn't it?

@chrisseaton
Copy link
Member

Here's a deterministic test.

require 'concurrent-ruby'

log = Queue.new

executor = Concurrent::FixedThreadPool.new(1, max_queue: 2, fallback_policy: :caller_runs)

worker_unblocker = Concurrent::CountDownLatch.new(1)
executor_unblocker = Concurrent::CountDownLatch.new(1)

# Block the worker thread
executor << proc { worker_unblocker.wait }

# Fill the queue
executor << proc { log.push :queued }
executor << proc { log.push :queued }

t = Thread.new {
  # Block in a caller_runs job
  executor << proc { executor_unblocker.wait; log.push :unblocked }
}

# Wait until the caller_runs job is blocked
Thread.pass until t.status == 'sleep'

# Now unblock the worker thread
worker_unblocker.count_down
executor_unblocker.count_down

executor.shutdown
executor.wait_for_termination

# Ideally, we will see the queued jobs run before the caller_runs job unblocks
p [log.pop, log.pop, log.pop]

# [:unblocked, :queued, :queued] bad
# [:queued, :queued, :unblocked] good

Phew that was hard.

I have a PR that achieves that, if I get feedback that is is the desired behaviour.

@jonmast
Copy link
Author

jonmast commented Mar 15, 2022

Sorry I'm so slow to follow up but this is perfect, thanks @chrisseaton

joker1007 added a commit to joker1007/crono_trigger that referenced this issue Aug 20, 2022
MattFenelon added a commit to MattFenelon/graphiti that referenced this issue Jun 13, 2024
Includes fix

* Worker threads get stuck when using caller_runs policy #933 in ruby-concurrency/concurrent-ruby#933
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug A bug in the library or documentation. high-priority Should be done ASAP.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants