Skip to content

Commit 36fe6f7

Browse files
committed
Add a concurrency primitive for waiting for a specific number of tasks to complete.
1 parent d68e5b6 commit 36fe6f7

File tree

2 files changed

+56
-0
lines changed

2 files changed

+56
-0
lines changed

lib/async/waiter.rb

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
2+
module Async
3+
# A composable synchronization primitive, which allows one task to wait for a number of other tasks to complete. It can be used in conjunction with {Semaphore} and/or {Barrier}.
4+
class Waiter
5+
def initialize(parent: nil, finished: Async::Condition.new)
6+
@finished = finished
7+
@done = []
8+
9+
@parent = parent
10+
end
11+
12+
def async(parent: (@parent or Task.current), &block)
13+
parent.async do |task|
14+
yield(task)
15+
ensure
16+
@done << task
17+
@finished.signal
18+
end
19+
end
20+
21+
def wait_for(count)
22+
while @done.size < count
23+
@finished.wait
24+
end
25+
26+
return @done.shift(count)
27+
end
28+
29+
def wait(count, exception: false)
30+
wait_for(count).map(&:result)
31+
end
32+
end
33+
end

test/async/waiter.rb

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
2+
require 'async/waiter'
3+
require 'sus/fixtures/async'
4+
5+
describe "Async::Waiter" do
6+
include Sus::Fixtures::Async::ReactorContext
7+
8+
it "can wait for a subset of tasks" do
9+
waiter = Async::Waiter.new
10+
11+
3.times do
12+
waiter.async do
13+
sleep(rand * 0.01)
14+
end
15+
end
16+
17+
done = waiter.wait(2)
18+
expect(done.size).to be == 2
19+
20+
done = waiter.wait(1)
21+
expect(done.size).to be == 1
22+
end
23+
end

0 commit comments

Comments
 (0)