Skip to content

Commit

Permalink
Support inputs that can be iterated only once
Browse files Browse the repository at this point in the history
Co-authored-by: Shuhei Kadowaki <aviatesk@gmail.com>
Co-authored-by: Jameson Nash <vtjnash@gmail.com>
  • Loading branch information
3 people committed Mar 1, 2024
1 parent ada8030 commit 1bd4973
Showing 1 changed file with 14 additions and 16 deletions.
30 changes: 14 additions & 16 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -370,49 +370,47 @@ waitany(tasks) = _wait_multiple(tasks)
waitall(tasks; failfast=false) = _wait_multiple(tasks; all=true, failfast=failfast)

function _wait_multiple(waiting_tasks; all=false, failfast=false)
foreach(waiting_tasks) do @nospecialize t
if !(t isa Task)
error("Expected an iterator of `Task` object")
end
end

chan = Channel{Tuple{Int,Task}}(Inf)
tasks = Task[]
done_mask = Bool[]
exception = false
nremaining::Int = 0

for (i, t) in enumerate(waiting_tasks)
t = t::Task
t isa Task || error("Expected an iterator of `Task` object")
push!(tasks, t)
if istaskdone(t)
push!(done_mask, true)
exception |= istaskfailed(t)
else
nremaining += 1
push!(done_mask, false)
schedule(Task(() -> begin
_wait(t)
put!(chan, (i, t))
end))
nremaining += 1
end
end

if nremaining == 0
close(chan)
return tasks, Task[]
elseif any(done_mask) && (!all || (failfast && exception))
close(chan)
return tasks[done_mask], tasks[.~done_mask]
end

chan = Channel{Tuple{Int,Task}}(Inf)

for (i, done) in enumerate(done_mask)
if !done
t = tasks[i]
waiter = @task put!(chan, (i, t))
waiter.sticky = false
_wait2(t, waiter)
end
end

while true
i, t = take!(chan)
done_mask[i] = true
exception |= istaskfailed(t)
nremaining -= 1

if nremaining == 0 || failfast && exception
if nremaining == 0 || (!all || failfast && exception)
break
end
end
Expand Down

0 comments on commit 1bd4973

Please sign in to comment.