Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 25 additions & 26 deletions base/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,10 @@ Stacktrace:
```
"""
function bind(c::Channel, task::Task)
ref = WeakRef(c)
register_taskdone_hook(task, tsk->close_chnl_on_taskdone(tsk, ref))
# TODO: implement "schedulewait" and deprecate taskdone_hook
#T = Task(() -> close_chnl_on_taskdone(task, c))
#schedulewait(task, T)
register_taskdone_hook(task, tsk -> close_chnl_on_taskdone(tsk, c))
return c
end

Expand Down Expand Up @@ -223,33 +225,30 @@ function channeled_tasks(n::Int, funcs...; ctypes=fill(Any,n), csizes=fill(0,n))
return (chnls, tasks)
end

function close_chnl_on_taskdone(t::Task, ref::WeakRef)
c = ref.value
if c isa Channel
isopen(c) || return
cleanup = () -> try
isopen(c) || return
if istaskfailed(t)
excp = task_result(t)
if excp isa Exception
close(c, excp)
return
end
function close_chnl_on_taskdone(t::Task, c::Channel)
isopen(c) || return
cleanup = () -> try
isopen(c) || return
if istaskfailed(t)
excp = task_result(t)
if excp isa Exception
close(c, excp)
return
end
close(c)
return
finally
unlock(c)
end
if trylock(c)
# can't use `lock`, since attempts to task-switch to wait for it
# will just silently fail and leave us with broken state
cleanup()
else
# so schedule this to happen once we are finished destroying our task
# (on a new Task)
@async (lock(c); cleanup())
close(c)
return
finally
unlock(c)
end
if trylock(c)
# can't use `lock`, since attempts to task-switch to wait for it
# will just silently fail and leave us with broken state
cleanup()
else
# so schedule this to happen once we are finished destroying our task
# (on a new Task)
@async (lock(c); cleanup())
end
nothing
end
Expand Down
11 changes: 6 additions & 5 deletions test/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,20 @@ using Distributed
@testset "channels bound to tasks" for N in [0, 10]
# Normal exit of task
c = Channel(N)
bind(c, @async (yield(); nothing))
bind(c, @async (GC.gc(); yield(); nothing))
@test_throws InvalidStateException take!(c)
@test !isopen(c)

# Error exception in task
c = Channel(N)
bind(c, @async (yield(); error("foo")))
bind(c, @async (GC.gc(); yield(); error("foo")))
@test_throws ErrorException take!(c)
@test !isopen(c)

# Multiple channels closed by the same bound task
cs = [Channel(N) for i in 1:5]
tf2 = () -> begin
tf2() = begin
GC.gc()
if N > 0
foreach(c -> (@assert take!(c) === 2), cs)
end
Expand Down Expand Up @@ -129,16 +130,16 @@ using Distributed
# Multiple tasks, first one to terminate closes the channel
nth = rand(1:5)
ref = Ref(0)
cond = Condition()
tf3(i) = begin
GC.gc()
if i == nth
ref[] = i
else
sleep(2.0)
end
end

tasks = [Task(()->tf3(i)) for i in 1:5]
tasks = [Task(() -> tf3(i)) for i in 1:5]
c = Channel(N)
foreach(t -> bind(c, t), tasks)
foreach(schedule, tasks)
Expand Down