Skip to content

Commit

Permalink
Make the worker pool an optional feature that defaults to off.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Nov 27, 2024
1 parent 9449e6f commit 41a0c92
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 13 deletions.
38 changes: 38 additions & 0 deletions .github/workflows/test-worker-pool.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
name: Test

on: [push, pull_request]

permissions:
contents: read

env:
CONSOLE_OUTPUT: XTerm
ASYNC_SCHEDULER_DEFAULT_WORKER_POOL: true

jobs:
test:
name: ${{matrix.ruby}} on ${{matrix.os}} / ASYNC_SCHEDULER_DEFAULT_WORKER_POOL=true
runs-on: ${{matrix.os}}-latest

strategy:
matrix:
os:
- ubuntu

ruby:
- head

steps:
- uses: actions/checkout@v3
- uses: ruby/setup-ruby@v1
with:
ruby-version: ${{matrix.ruby}}
bundler-cache: true

- name: Run tests
timeout-minutes: 10
run: bundle exec bake test

- name: Run external tests
timeout-minutes: 10
run: bundle exec bake test:external
27 changes: 14 additions & 13 deletions lib/async/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
module Async
# Handles scheduling of fibers. Implements the fiber scheduler interface.
class Scheduler < Node
DEFAULT_WORKER_POOL = ENV.fetch("ASYNC_SCHEDULER_DEFAULT_WORKER_POOL", nil).then do |value|
value == "true" ? true : nil
end

# Raised when an operation is attempted on a closed scheduler.
class ClosedError < RuntimeError
# Create a new error.
Expand All @@ -38,7 +42,7 @@ def self.supported?
# @public Since *Async v1*.
# @parameter parent [Node | Nil] The parent node to use for task hierarchy.
# @parameter selector [IO::Event::Selector] The selector to use for event handling.
def initialize(parent = nil, selector: nil)
def initialize(parent = nil, selector: nil, worker_pool: DEFAULT_WORKER_POOL)
super(parent)

@selector = selector || ::IO::Event::Selector.new(Fiber.current)
Expand All @@ -50,7 +54,15 @@ def initialize(parent = nil, selector: nil)
@idle_time = 0.0

@timers = ::IO::Event::Timers.new
@worker_pool = WorkerPool.new
if worker_pool == true
@worker_pool = WorkerPool.new
else
@worker_pool = worker_pool
end

if @worker_pool
self.singleton_class.prepend(WorkerPool::BlockingOperationWait)
end
end

# Compute the scheduler load according to the busy and idle times that are updated by the run loop.
Expand Down Expand Up @@ -348,17 +360,6 @@ def process_wait(pid, flags)
return @selector.process_wait(Fiber.current, pid, flags)
end

# Wait for the given work to be executed.
#
# @public Since *Async v2.19* and *Ruby v3.4*.
# @asynchronous May be non-blocking.
#
# @parameter work [Proc] The work to execute on a background thread.
# @returns [Object] The result of the work.
def blocking_operation_wait(work)
@worker_pool.call(work)
end

# Run one iteration of the event loop.
#
# When terminating the event loop, we already know we are finished. So we don't need to check the task tree. This is a logical requirement because `run_once` ignores transient tasks. For example, a single top level transient task is not enough to keep the reactor running, but during termination we must still process it in order to terminate child tasks.
Expand Down
13 changes: 13 additions & 0 deletions lib/async/worker_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,19 @@ module Async
#
# @private
class WorkerPool
module BlockingOperationWait
# Wait for the given work to be executed.
#
# @public Since *Async v2.19* and *Ruby v3.4*.
# @asynchronous May be non-blocking.
#
# @parameter work [Proc] The work to execute on a background thread.
# @returns [Object] The result of the work.
def blocking_operation_wait(work)
@worker_pool.call(work)
end
end

class Promise
def initialize(work)
@work = work
Expand Down

0 comments on commit 41a0c92

Please sign in to comment.