Replies: 7 comments 2 replies
-
A barrier is supposed to be a synchronisation point of multiple tasks. Probably However, I suppose what I was thinking, was that if a task has failed with an unhandled error, the entire process might be failed. Order can matter, since you might have inter-dependencies. I don't mind changing this semantic, but I suppose we just need to be mindful of what problems we are trying to solve and the best way to organise the code/solutions. |
Beta Was this translation helpful? Give feedback.
-
It's common to start several task and wait for them all to complete. This is what barrier handles. But if one of the tasks fails with an unhandled error, then there is often no need to wait for all tasks to complete, because we know that we're not going to to able to get a complete result because one of the tasks is corrupted. So we might want to handle this by e.g. stopping all remaining task and starting over, failing the parent task, or perhaps aborting entirely. In general I think handling errors is one of the most difficult aspects of using Async. My goal is to create fault tolerant code. I'm currently inspired by Erlang/Elixir and supervisor trees, and the idea of restarting failed parts from a known good state. |
Beta Was this translation helpful? Give feedback.
-
Perhaps you could choose between whether the barrier:
I think the current behaviour is somewhere between these, but to me seems a bit hard to use because it depends on the ordering of tasks. Async do
barrier = Async::Barrier.new
barrier.async { raise 'ups' }
barrier.async { sleep 1 }
barrier.wait # raise as soon as any task fails
barrier.complete # wait for all tasks, ignoring errors
rescue StandardError => e
# what task caused the error?
ensure
barrier.stop
end Task#wait raises if the task fails, so it seems appropriate that Barrier#wait does the same, but for all tasks. Is there a way to glean form the error what task originally raised it? |
Beta Was this translation helpful? Give feedback.
-
I tried with this implementation of def wait
condition = Async::Condition.new
guard = Async do
until @tasks.empty?
result = condition.wait
raise result if result.is_a? StandardError
end
end
@tasks.each do |waiting|
Async do
begin
task = waiting.task
task.wait
ensure
@tasks.remove?(waiting) unless task.alive?
end
condition.signal :ok
rescue StandardError => e
condition.signal e
end
end
guard.wait
end Now the barrier will abort and re-raise as soon as any task fails: require 'async'
require 'async/barrier'
Async do
barrier = Async::Barrier.new
barrier.async do |task1|
task1.annotate(:task1)
sleep 1000
end
barrier.async do |task2|
task2.annotate(:task2)
sleep 0.1
RuntimeError.new 'boom!'
end
barrier.wait
puts 'All tasks completed'
rescue StandardError => e
puts "Task error: #{e}"
ensure
barrier.stop
end Instead of waiting for each task in turn, we run a separate task the waits for a condition, as long as there are tasks remaining. When a task completes or fails it signals the condition and removes itself. The guard can the abort the wait if a task failed. But having to run each task inside a task fells clunky, there's problably a better way to do it? |
Beta Was this translation helpful? Give feedback.
-
I think this is what I attemped - use a notification to let tasks inform the barirer whether they succeed or fail, so the barrier can fail fast. But I'm sure you can improve :-) |
Beta Was this translation helpful? Give feedback.
-
Old discussion but I have recently been down a fail-fail Barrier rabbithole. I was starting tasks for io read and write loops, and waiting on both for graceful (or not) disconnect. While investigating how to solve that I came across this discussion and the suggestion from Which lead to module Async
class EnumerableWaiter
include Enumerable
# @param [:async | nil] parent the parent task to use for asynchronous operations.
def initialize(parent: nil)
@parent = parent
@queue = Queue.new
@task_count = 0
end
def async(parent: (@parent or Task.current), **options, &block)
@task_count += 1
parent.async(**options, finished: @queue, &block)
end
# Yield tasks as they complete (including stopped)
def each_task
return enum_for(:each_task).lazy unless block_given?
yield(@queue.dequeue.tap { @task_count -= 1 }) until @task_count.zero?
end
# Yield task results as they complete
def each
return enum_for(:each).lazy unless block_given?
each_task { |t| yield t.wait unless t.stopped? }
end
# Wait in task completion order, which will raise on first failed task
def wait
each(&:itself)
end
# eg for parent barrier.stop
def respond_to_missing?(...)
@parent.respond_to?(...)
end
def method_missing(...)
@parent.send(...)
end
end
end Can be used as fast fail barrier.. Async::EnumerableWaiter.new(parent: Async::Barrier.new).tap do |barrier|
barrier.async { }
# ...
barrier.wait
ensure
barrier.stop
end or just as an enumerable over the tasks Async::EnumerableWaiter.new.tap do |waiter|
waiter.async { }
#...
waiter.first(5) # or .chunk, .reduce, .to_a etc..
end However, using a shared :finished notification for a set of tasks creates an unexpected constraint that you 1 - Task#wait will return early if a shared :finished condition is used # `finish!` will set both of these to nil before signaling the condition:
if @block || @fiber
@finished ||= Condition.new
@finished.wait
end
# ... proceed to return result or raise The task calling #wait will be resumed when any task using the shared Queue is signaled by if @block || @fiber
@finished ||= Condition.new
until @finished.wait.equal?(self); end
end 2 - Task#wait will discard messages from the Queue. Async::Queue#wait is an alias to #dequeue, so the above call to Potential (breaking change) solution would be to make def wait
@available.wait
end Finally, I have a suggestion for Async::Barrier to encapsulate the common usage pattern def wait!
yield self if block_given? # start tasks
wait
ensure
stop
end used as.. Async::Barrier.new.wait! do |b|
b.async { }
b.async { }
end |
Beta Was this translation helpful? Give feedback.
-
After considering these discussions and others, I'm going to revisit how |
Beta Was this translation helpful? Give feedback.
-
When calling wait() on a barrier, it will return exceptions from any of the tasks in the barrier, but apparently not until all tasks added before the failing task completes.
Here two seconds pass before the barrier reports the error:
But just but changing the order the tasks are created, the barrier now returns the error immediately:
I would prefer that it returns exceptions as soon as any task fails.
It also seems odd to me that the order of the tasks matter, even though they run asynchronously.
Beta Was this translation helpful? Give feedback.
All reactions