Skip to content

caller_runs execution should not block the work queue #939

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,28 +75,31 @@ def auto_terminate=(value)

private

# Handler which executes the `fallback_policy` once the queue size
# reaches `max_queue`.
# Returns an action which executes the `fallback_policy` once the queue
# size reaches `max_queue`. The reason for the indirection of an action
# is so that the work can be deferred outside of synchronization.
#
# @param [Array] args the arguments to the task which is being handled.
#
# @!visibility private
def handle_fallback(*args)
def fallback_action(*args)
case fallback_policy
when :abort
raise RejectedExecutionError
lambda { raise RejectedExecutionError }
when :discard
false
lambda { false }
when :caller_runs
begin
yield(*args)
rescue => ex
# let it fail
log DEBUG, ex
end
true
lambda {
begin
yield(*args)
rescue => ex
# let it fail
log DEBUG, ex
end
true
}
else
fail "Unknown fallback policy #{fallback_policy}"
lambda { fail "Unknown fallback policy #{fallback_policy}" }
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class JavaExecutorService < AbstractExecutorService

def post(*args, &task)
raise ArgumentError.new('no block given') unless block_given?
return handle_fallback(*args, &task) unless running?
return fallback_action(*args, &task).call unless running?
@executor.submit Job.new(args, task)
true
rescue Java::JavaUtilConcurrent::RejectedExecutionException
Expand Down
14 changes: 10 additions & 4 deletions lib/concurrent-ruby/concurrent/executor/ruby_executor_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@ def initialize(*args, &block)

def post(*args, &task)
raise ArgumentError.new('no block given') unless block_given?
synchronize do
# If the executor is shut down, reject this task
return handle_fallback(*args, &task) unless running?
ns_execute(*args, &task)
deferred_action = synchronize {
if running?
ns_execute(*args, &task)
else
fallback_action(*args, &task)
end
}
if deferred_action
deferred_action.call
else
true
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,11 @@ def ns_execute(*args, &task)
if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task)
@scheduled_task_count += 1
else
handle_fallback(*args, &task)
return fallback_action(*args, &task)
end

ns_prune_pool if @next_gc_time < Concurrent.monotonic_time
nil
end

# @!visibility private
Expand Down
40 changes: 40 additions & 0 deletions spec/concurrent/executor/thread_pool_executor_shared.rb
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,46 @@
executor << proc { latch.count_down }
latch.wait(0.1)
end

specify '#post does not block other jobs running on the worker threads' do
log = Queue.new

# Using a custom instance
executor = described_class.new(
min_threads: 1,
max_threads: 1,
max_queue: 1,
fallback_policy: :caller_runs)

worker_unblocker = Concurrent::CountDownLatch.new(1)
executor_unblocker = Concurrent::CountDownLatch.new(1)
queue_done = Concurrent::CountDownLatch.new(1)

# Block the worker thread
executor << proc { worker_unblocker.wait }

# Fill the queue
executor << proc { log.push :queued; queue_done.count_down }

# Block in a caller_runs job
caller_runs_thread = Thread.new {
executor << proc { executor_unblocker.wait; log.push :unblocked }
}

# Wait until the caller_runs job is blocked
Thread.pass until caller_runs_thread.status == 'sleep'

# Now unblock the worker thread
worker_unblocker.count_down
queue_done.wait
executor_unblocker.count_down

# Tidy up
caller_runs_thread.join

# We will see the queued jobs run before the caller_runs job unblocks
expect([log.pop, log.pop]).to eq [:queued, :unblocked]
end
end
end
end