Open
Description
* Operating system: macOS Sequoia 15.0.1
* Ruby implementation: ruby 3.4.0preview1
* `concurrent-ruby` version: 1.3.3
* `concurrent-ruby-ext` installed: no
* `concurrent-ruby-edge` used: no
Cross-post of rails/rails#53211 to open up a discussion. Specifically for the part in the script where a second consecutive 'batch' of work is assigned to a Concurrent::ThreadPoolExecutor
pool and it is incorrectly pruned before the processing has begun:
begin
require "concurrent-ruby"
pool = Concurrent::ThreadPoolExecutor.new(
min_threads: 1,
max_threads: 4,
max_queue: 0,
idletime: 3
)
# First thread is lazily spawned.
puts pool.length #=> 0
work = -> { sleep 2 }
# Batch (gap-less individual units) of work.
10.times { pool << work }
# Wait for state updates.
sleep 0.25
# Expected scale up.
puts pool.length #=> 4
# Wait for all work to be processed.
# This is sufficient cause work is I/O bound and parallel.
sleep 10
puts pool.length #=> 4
# Wait until idle time of all threads has elapsed.
# This is sufficient; only needs to be greater than the idle time of the last busy thread.
sleep 5
# Not scaled down.
# Prune will only take place when next unit of work is received, despite idle time elapse.
puts pool.length #=> 4
# Wait for a while to show no change.
sleep 20
puts pool.length #=> 4
# Another batch of work.
10.times { pool << work }
# Wait for state updates.
sleep 0.25
# This case is the most interesting, and might need to be addressed in concurrent-ruby.
# If bulk work comes in when scaled up, since prune is called right after assignment / queuing,
# there's a race condition between when the ready workers size is checked for prune, and the
# threads start processing the work, which is when the ready size is updated. As a result, we end
# up with a single thread handling all the work i.e. the pool is prematurely scaled down, and stays
# that way since all units of work have been assigned / queued.
puts "pool should ideally be scaled up here"
puts pool.length #=> 1
# Wait for all work to be processed.
# Work is now sequential.
sleep 25
puts pool.length #=> 1
# Wait until idle time of all threads has elapsed.
sleep 5
puts pool.length #=> 1
# Wait for a while to show no change.
sleep 20
puts pool.length #=> 1
# Individual units of work, spaced apart.
# No work will be completed by the time the last unit is added.
1.times { pool << work }
sleep 0.25
1.times { pool << work }
sleep 0.25
1.times { pool << work }
sleep 0.25
1.times { pool << work }
# Wait for state updates.
sleep 0.25
# Expected scale up.
puts pool.length #=> 4
# Wait for all work to be processed.
sleep 10
puts pool.length #=> 4
# Wait until idle time of all threads has elapsed.
sleep 5
# Once again, won't scale down till the next unit.
puts pool.length #=> 4
# Wait for a while to show no change.
sleep 20
puts pool.length #=> 4
# Single unit of work.
1.times { pool << work }
# Wait for state updates.
sleep 0.25
# Expected scale down.
puts pool.length #=> 1
# Wait for all work to be processed.
sleep 10
puts pool.length #=> 1
# Wait until idle time of all threads has elapsed.
sleep 5
puts pool.length #=> 1
end
Metadata
Metadata
Assignees
Labels
No labels