Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions fixtures/async/a_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ module Async

waiting_task.wait
expect(waiting_task).to be(:finished?)
expect(queue).to be(:closed?)
end
end

Expand Down
4 changes: 3 additions & 1 deletion lib/async/barrier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,16 @@ def size
# Execute a child task and add it to the barrier.
# @asynchronous Executes the given block concurrently.
def async(*arguments, parent: (@parent or Task.current), **options, &block)
raise "Barrier is stopped!" if @finished.closed?

waiting = nil

parent.async(*arguments, **options) do |task, *arguments|
waiting = TaskNode.new(task)
@tasks.append(waiting)
block.call(task, *arguments)
ensure
@finished.signal(waiting)
@finished.signal(waiting) unless @finished.closed?
end
end

Expand Down
5 changes: 5 additions & 0 deletions lib/async/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ def initialize(parent: nil, available: Notification.new)
@available = available
end

# @returns [Boolean] Whether the queue is closed.
def closed?
@closed
end

# Close the queue, causing all waiting tasks to return `nil`. Any subsequent calls to {enqueue} will raise an exception.
def close
@closed = true
Expand Down
4 changes: 4 additions & 0 deletions releases.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Releases

## Unreleased

- Fix race condition between `Async::Barrier#stop` and finish signalling.

## v2.28.0

- Use `Traces.current_context` and `Traces.with_context` for better integration with OpenTelemetry.
Expand Down
47 changes: 47 additions & 0 deletions test/async/barrier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,53 @@
task2.wait
task3.wait
end

it "can stop tasks with ensure blocks without raising errors" do
ensure_block_executed = false

task = barrier.async do |task|
sleep(10) # This will be interrupted by stop
ensure
# This ensure block should execute without causing ClosedError
sleep(0.01) # Simulate cleanup work
ensure_block_executed = true
end

# Stop the barrier while the task is running
barrier.stop

# Wait for the task to complete (it should be stopped)
task.wait

expect(task).to be(:stopped?)
expect(ensure_block_executed).to be == true
end

it "can stop multiple tasks with ensure blocks simultaneously" do
ensure_counts = []
tasks = []

# Create multiple tasks with ensure blocks
3.times do |i|
tasks << barrier.async do |task|
sleep(10) # This will be interrupted by stop
ensure
# Each ensure block should execute without causing ClosedError
sleep(0.01) # Simulate cleanup work
ensure_counts << i
end
end

# Stop the barrier while tasks are running
barrier.stop

# Wait for all tasks to complete
tasks.each(&:wait)

# All tasks should be stopped and all ensure blocks executed
tasks.each {|task| expect(task).to be(:stopped?)}
expect(ensure_counts.sort).to be == [0, 1, 2]
end
end

with "semaphore" do
Expand Down
Loading