Skip to content

Unexpected pruning behaviour with consecutive task batches #1066

Open
@joshuay03

Description

@joshuay03
* Operating system:                macOS Sequoia 15.0.1
* Ruby implementation:             ruby 3.4.0preview1
* `concurrent-ruby` version:       1.3.3
* `concurrent-ruby-ext` installed: no
* `concurrent-ruby-edge` used:     no

Cross-post of rails/rails#53211 to open up a discussion. Specifically for the part in the script where a second consecutive 'batch' of work is assigned to a Concurrent::ThreadPoolExecutor pool and it is incorrectly pruned before the processing has begun:

   begin
     require "concurrent-ruby"
     
     pool = Concurrent::ThreadPoolExecutor.new(
       min_threads: 1,
       max_threads: 4,
       max_queue: 0,
       idletime: 3
     )
     # First thread is lazily spawned.
     puts pool.length #=> 0
     
     work = -> { sleep 2 }
     
     # Batch (gap-less individual units) of work.
     10.times { pool << work }
     # Wait for state updates.
     sleep 0.25
     # Expected scale up.
     puts pool.length #=> 4
     # Wait for all work to be processed.
     # This is sufficient cause work is I/O bound and parallel.
     sleep 10
     puts pool.length #=> 4
     # Wait until idle time of all threads has elapsed.
     # This is sufficient; only needs to be greater than the idle time of the last busy thread.
     sleep 5
     # Not scaled down.
     # Prune will only take place when next unit of work is received, despite idle time elapse.
     puts pool.length #=> 4

     # Wait for a while to show no change.
     sleep 20
     puts pool.length #=> 4
     
     # Another batch of work.
     10.times { pool << work }
     # Wait for state updates.
     sleep 0.25
     # This case is the most interesting, and might need to be addressed in concurrent-ruby.
     # If bulk work comes in when scaled up, since prune is called right after assignment / queuing, 
     # there's a race condition between when the ready workers size is checked for prune, and the
     # threads start processing the work, which is when the ready size is updated. As a result, we end
     # up with a single thread handling all the work i.e. the pool is prematurely scaled down, and stays
     # that way since all units of work have been assigned / queued.
     puts "pool should ideally be scaled up here"
     puts pool.length #=> 1
     # Wait for all work to be processed.
     # Work is now sequential.
     sleep 25
     puts pool.length #=> 1
     # Wait until idle time of all threads has elapsed.
     sleep 5
     puts pool.length #=> 1

     # Wait for a while to show no change.
     sleep 20
     puts pool.length #=> 1
     
     # Individual units of work, spaced apart.
     # No work will be completed by the time the last unit is added.
     1.times { pool << work }
     sleep 0.25
     1.times { pool << work }
     sleep 0.25
     1.times { pool << work }
     sleep 0.25
     1.times { pool << work }
     # Wait for state updates.
     sleep 0.25
     # Expected scale up.
     puts pool.length #=> 4
     # Wait for all work to be processed.
     sleep 10
     puts pool.length #=> 4
     # Wait until idle time of all threads has elapsed.
     sleep 5
     # Once again, won't scale down till the next unit.
     puts pool.length #=> 4

     # Wait for a while to show no change.
     sleep 20
     puts pool.length #=> 4
     
     # Single unit of work.
     1.times { pool << work }
     # Wait for state updates.
     sleep 0.25 
     # Expected scale down.
     puts pool.length #=> 1
     # Wait for all work to be processed.
     sleep 10
     puts pool.length #=> 1
     # Wait until idle time of all threads has elapsed.
     sleep 5
     puts pool.length #=> 1
   end

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions