Skip to content

Commit 314d082

Browse files
committed
Fix race between stop and task exiting.
1 parent 998288e commit 314d082

File tree

4 files changed

+59
-1
lines changed

4 files changed

+59
-1
lines changed

lib/async/barrier.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,16 @@ def size
4343
# Execute a child task and add it to the barrier.
4444
# @asynchronous Executes the given block concurrently.
4545
def async(*arguments, parent: (@parent or Task.current), **options, &block)
46+
raise "Barrier is stopped!" if @finished.closed?
47+
4648
waiting = nil
4749

4850
parent.async(*arguments, **options) do |task, *arguments|
4951
waiting = TaskNode.new(task)
5052
@tasks.append(waiting)
5153
block.call(task, *arguments)
5254
ensure
53-
@finished.signal(waiting)
55+
@finished.signal(waiting) unless @finished.closed?
5456
end
5557
end
5658

lib/async/queue.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ def initialize(parent: nil, available: Notification.new)
3232
@available = available
3333
end
3434

35+
# @returns [Boolean] Whether the queue is closed.
36+
def closed?
37+
@closed
38+
end
39+
3540
# Close the queue, causing all waiting tasks to return `nil`. Any subsequent calls to {enqueue} will raise an exception.
3641
def close
3742
@closed = true

releases.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Releases
22

3+
## Unreleased
4+
5+
- Fix race condition between `Async::Barrier#stop` and finish signalling.
6+
37
## v2.28.0
48

59
- Use `Traces.current_context` and `Traces.with_context` for better integration with OpenTelemetry.

test/async/barrier.rb

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,53 @@
197197
task2.wait
198198
task3.wait
199199
end
200+
201+
it "can stop tasks with ensure blocks without raising errors" do
202+
ensure_block_executed = false
203+
204+
task = barrier.async do |task|
205+
sleep(10) # This will be interrupted by stop
206+
ensure
207+
# This ensure block should execute without causing ClosedError
208+
sleep(0.01) # Simulate cleanup work
209+
ensure_block_executed = true
210+
end
211+
212+
# Stop the barrier while the task is running
213+
barrier.stop
214+
215+
# Wait for the task to complete (it should be stopped)
216+
task.wait
217+
218+
expect(task).to be(:stopped?)
219+
expect(ensure_block_executed).to be == true
220+
end
221+
222+
it "can stop multiple tasks with ensure blocks simultaneously" do
223+
ensure_counts = []
224+
tasks = []
225+
226+
# Create multiple tasks with ensure blocks
227+
3.times do |i|
228+
tasks << barrier.async do |task|
229+
sleep(10) # This will be interrupted by stop
230+
ensure
231+
# Each ensure block should execute without causing ClosedError
232+
sleep(0.01) # Simulate cleanup work
233+
ensure_counts << i
234+
end
235+
end
236+
237+
# Stop the barrier while tasks are running
238+
barrier.stop
239+
240+
# Wait for all tasks to complete
241+
tasks.each(&:wait)
242+
243+
# All tasks should be stopped and all ensure blocks executed
244+
tasks.each {|task| expect(task).to be(:stopped?)}
245+
expect(ensure_counts.sort).to be == [0, 1, 2]
246+
end
200247
end
201248

202249
with "semaphore" do

0 commit comments

Comments
 (0)