Skip to content

Commit 85e5c66

Browse files
committed
added tests, added channeled_tasks
1 parent df910d9 commit 85e5c66

File tree

11 files changed

+185
-78
lines changed

11 files changed

+185
-78
lines changed

base/channels.jl

Lines changed: 64 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ Other constructors:
2020
type Channel{T} <: AbstractChannel
2121
cond_take::Condition # waiting for data to become available
2222
cond_put::Condition # waiting for a writeable slot
23-
state::Symbol
23+
state::Tuple
2424

2525
data::Array{T,1}
2626
sz_max::Int # maximum size of channel
@@ -39,7 +39,7 @@ type Channel{T} <: AbstractChannel
3939
if sz < 0
4040
throw(ArgumentError("Channel size must be either 0, a positive integer or Inf"))
4141
end
42-
new(Condition(), Condition(), :open, Array{T}(0), sz, Array{Condition}(0))
42+
new(Condition(), Condition(), (:open,nothing), Array{T}(0), sz, Array{Condition}(0))
4343
end
4444

4545
# deprecated empty constructor
@@ -60,6 +60,12 @@ closed_exception() = InvalidStateException("Channel is closed.", :closed)
6060

6161
isbuffered(c::Channel) = c.sz_max==0 ? false : true
6262

63+
function check_channel_state(c::Channel)
64+
if !isopen(c)
65+
isa(c.state[2], Exception) && throw(c.state[2])
66+
throw(closed_exception())
67+
end
68+
end
6369
"""
6470
close(c::Channel)
6571
@@ -69,27 +75,65 @@ Closes a channel. An exception is thrown by:
6975
* [`take!`](@ref) and [`fetch`](@ref) on an empty, closed channel.
7076
"""
7177
function close(c::Channel)
72-
c.state = :closed
78+
c.state = (:closed, nothing)
7379
notify_error(c::Channel, closed_exception())
7480
nothing
7581
end
76-
isopen(c::Channel) = (c.state == :open)
82+
isopen(c::Channel) = (c.state[1] == :open)
83+
84+
"""
85+
bind(chnl::Channel, task::Task)
7786
78-
# Associates the lifetime of the Channel to a Task.
79-
# Closes the channel automatically when the task terminates.
80-
# Any error in the task is also propagated to waiters on the Channel.
81-
function close(c::Channel, t::Task)
87+
Associates the lifetime of `chnl` with a task.
88+
Channel `chnl` is automatically closed when the task terminates.
89+
Any uncaught exception in the task is propagated to all waiters on `chnl`.
90+
91+
The `chnl` object can be explicitly closed independent of task termination.
92+
Terminating tasks have no effect on already closed Channel objects.
93+
94+
When a channel is bound to multiple tasks, the first task to terminate will
95+
close the channel. When multiple channels are bound to the same task,
96+
termination of the task will close all channels.
97+
"""
98+
function bind(c::Channel, task::Task)
8299
ref = WeakRef(c)
83-
register_taskdone_hook(t, tsk->close_chnl_on_taskdone(tsk, ref))
84-
nothing
100+
register_taskdone_hook(task, tsk->close_chnl_on_taskdone(tsk, ref))
101+
c
102+
end
103+
104+
"""
105+
channeled_tasks(n::Int, funcs...; ctypes=fill(Any,n), csizes=fill(0,n))
106+
107+
A convenience method to create `n` channels and bind them to tasks started
108+
from the provided functions in a single call. Each `func` must accept `n` arguments
109+
which are the created channels. Channel types and sizes may be specified via
110+
keyword arguments `ctypes` and `csizes` respectively. If unspecified, all channels are
111+
of type `Channel{Any}(0)`.
112+
113+
Returns a tuple, `(Array{Channel}, Array{Task})`, of the created channels and tasks.
114+
"""
115+
function channeled_tasks(n::Int, funcs...; ctypes=fill(Any,n), csizes=fill(0,n))
116+
@assert length(csizes) == n
117+
@assert length(ctypes) == n
118+
119+
chnls = map(i->Channel{ctypes[i]}(csizes[i]), 1:n)
120+
tasks=Task[Task(()->f(chnls...)) for f in funcs]
121+
122+
# bind all tasks to all channels and schedule them
123+
foreach(t -> foreach(c -> bind(c,t), chnls), tasks)
124+
foreach(t->schedule(t), tasks)
125+
126+
yield() # Allow scheduled tasks to run
127+
128+
return (chnls, tasks)
85129
end
86130

87131
function close_chnl_on_taskdone(t::Task, ref::WeakRef)
88132
if ref.value !== nothing
89133
c = ref.value
90134
!isopen(c) && return
91135
if istaskfailed(t)
92-
c.state = :closed
136+
c.state = (:closed, task_result(t))
93137
notify_error(c, task_result(t))
94138
else
95139
close(c)
@@ -111,8 +155,10 @@ For unbuffered channels, blocks until a [`take!`](@ref) is performed by a differ
111155
task.
112156
"""
113157
function put!(c::Channel, v)
114-
!isopen(c) && throw(closed_exception())
158+
check_channel_state(c)
115159
isbuffered(c) ? put_buffered(c,v) : put_unbuffered(c,v)
160+
yield()
161+
v
116162
end
117163

118164
function put_buffered(c::Channel, v)
@@ -170,7 +216,7 @@ shift!(c::Channel) = take!(c)
170216

171217
# 0-size channel
172218
function take_unbuffered(c::Channel)
173-
!isopen(c) && throw(closed_exception())
219+
check_channel_state(c)
174220
cond_taker = Condition()
175221
push!(c.takers, cond_taker)
176222
notify(c.cond_put, nothing, false, false)
@@ -200,7 +246,7 @@ n_avail(c::Channel) = isbuffered(c) ? length(c.data) : n_waiters(c.cond_put)
200246

201247
function wait(c::Channel)
202248
while !isready(c)
203-
!isopen(c) && throw(closed_exception())
249+
check_channel_state(c)
204250
wait(c.cond_take)
205251
end
206252
nothing
@@ -216,14 +262,14 @@ eltype{T}(::Type{Channel{T}}) = T
216262

217263
show(io::IO, c::Channel) = print(io, "$(typeof(c))(sz_max:$(c.sz_max),sz_curr:$(n_avail(c)))")
218264

219-
type ChannelState{T}
265+
type ChannelIterState{T}
220266
hasval::Bool
221267
val::T
222-
ChannelState(x) = new(x)
268+
ChannelIterState(x) = new(x)
223269
end
224270

225-
start{T}(c::Channel{T}) = ChannelState{T}(false)
226-
function done(c::Channel, state::ChannelState)
271+
start{T}(c::Channel{T}) = ChannelIterState{T}(false)
272+
function done(c::Channel, state::ChannelIterState)
227273
try
228274
# we are waiting either for more data or channel to be closed
229275
state.hasval && return false

base/docs/helpdb/Base.jl

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -707,14 +707,6 @@ Show every part of the representation of a value.
707707
"""
708708
dump
709709

710-
"""
711-
consume(task, values...)
712-
713-
Receive the next value passed to `produce` by the specified task. Additional arguments may
714-
be passed, to be returned from the last `produce` call in the producer.
715-
"""
716-
consume
717-
718710
"""
719711
cummax(A, [dim])
720712
@@ -1886,14 +1878,6 @@ the topmost backend that does not throw a `MethodError`).
18861878
"""
18871879
pushdisplay
18881880

1889-
"""
1890-
produce(value)
1891-
1892-
Send the given value to the last `consume` call, switching to the consumer task. If the next
1893-
`consume` call passes any values, they are returned by `produce`.
1894-
"""
1895-
produce
1896-
18971881
"""
18981882
StackOverflowError()
18991883

base/exports.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -950,6 +950,7 @@ export
950950
sizeof,
951951

952952
# tasks and conditions
953+
channeled_tasks,
953954
Condition,
954955
consume,
955956
current_task,

base/file.jl

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -470,28 +470,24 @@ function walkdir(root; topdown=true, follow_symlinks=false, onerror=throw)
470470
end
471471

472472
function _it(chnl)
473-
close(chnl, current_task())
474473
if topdown
475474
put!(chnl, (root, dirs, files))
476-
yield()
477475
end
478476
for dir in dirs
479477
path = joinpath(root,dir)
480478
if follow_symlinks || !islink(path)
481479
for (root_l, dirs_l, files_l) in walkdir(path, topdown=topdown, follow_symlinks=follow_symlinks, onerror=onerror)
482480
put!(chnl, (root_l, dirs_l, files_l))
483-
yield()
484481
end
485482
end
486483
end
487484
if !topdown
488485
put!(chnl, (root, dirs, files))
489-
yield()
490486
end
491487
end
492-
chnl = Channel(0)
493-
@schedule _it(chnl)
494-
return chnl
488+
489+
chnls, _ = channeled_tasks(1, _it)
490+
return chnls[1]
495491
end
496492

497493
function unlink(p::AbstractString)

base/task.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ suppress_excp_printing(t::Task) = isa(t.storage, ObjectIdDict) ? get(get_task_tl
187187
function register_taskdone_hook(t::Task, hook)
188188
tls = get_task_tls(t)
189189
push!(get!(tls, :TASKDONE_HOOKS, []), hook)
190-
nothing
190+
t
191191
end
192192

193193
# runtime system hook called when a task finishes

doc/src/stdlib/parallel.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ Base.yieldto
88
Base.current_task
99
Base.istaskdone
1010
Base.istaskstarted
11-
Base.consume
12-
Base.produce
1311
Base.yield
1412
Base.task_local_storage(::Any)
1513
Base.task_local_storage(::Any, ::Any)
@@ -26,6 +24,8 @@ Base.take!(::Channel)
2624
Base.isready(::Channel)
2725
Base.fetch(::Channel)
2826
Base.close(::Channel)
27+
Base.bind(c::Channel, task::Task)
28+
Base.channeled_tasks
2929
Base.asyncmap
3030
Base.asyncmap!
3131
```

test/bitarray.jl

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -302,11 +302,10 @@ let b1 = bitrand(n1, n2)
302302
put!(c, (randperm(m1), :, BitMatrix))
303303
m1, m2 = rand_m1m2()
304304
put!(c, (randperm(m1), randperm(m2), BitMatrix))
305-
close(c)
306305
end
307-
c = Channel(0)
308-
@schedule gen_getindex_data(c)
309-
for (k1, k2, T) in c
306+
307+
chnls, _ = channeled_tasks(1, gen_getindex_data)
308+
for (k1, k2, T) in chnls[1]
310309
# println(typeof(k1), " ", typeof(k2), " ", T) # uncomment to debug
311310
@check_bit_operation getindex(b1, k1, k2) T
312311
end
@@ -349,12 +348,10 @@ let b1 = bitrand(s1, s2, s3, s4)
349348

350349
m1, m2, m3, m4 = (2:3, 2:7, 1:2, 4:6)
351350
put!(c, (m1, m2, m3, m4, BitArray{4}))
352-
close(c)
353351
end
354352

355-
c = Channel(0)
356-
@schedule gen_getindex_data4(c)
357-
for (k1, k2, k3, k4, T) in c
353+
chnls, _ = channeled_tasks(1, gen_getindex_data4)
354+
for (k1, k2, k3, k4, T) in chnls[1]
358355
#println(typeof(k1), " ", typeof(k2), " ", typeof(k3), " ", typeof(k4), " ", T) # uncomment to debug
359356
@check_bit_operation getindex(b1, k1, k2, k3, k4) T
360357
end
@@ -393,12 +390,10 @@ let b1 = bitrand(n1, n2)
393390
m1, m2 = rand_m1m2()
394391
put!(c, (rand(Bool), randperm(m1), randperm(m2)))
395392
put!(c, (bitrand(m1,m2), randperm(m1), randperm(m2)))
396-
close(c)
397393
end
398394

399-
c = Channel(0)
400-
@schedule gen_setindex_data(c)
401-
for (b2, k1, k2) in c
395+
chnls, _ = channeled_tasks(1, gen_setindex_data)
396+
for (b2, k1, k2) in chnls[1]
402397
# println(typeof(b2), " ", typeof(k1), " ", typeof(k2)) # uncomment to debug
403398
@check_bit_operation setindex!(b1, b2, k1, k2) BitMatrix
404399
end
@@ -451,12 +446,10 @@ let b1 = bitrand(s1, s2, s3, s4)
451446

452447
m1, m2, m3, m4 = (2:3, 2:7, 1:2, 4:6)
453448
put!(c, (bitrand(2, 6, 2, 3), m1, m2, m3, m4))
454-
close(c)
455449
end
456450

457-
c = Channel(0)
458-
@schedule gen_setindex_data4(c)
459-
for (b2, k1, k2, k3, k4) in c
451+
chnls, _ = channeled_tasks(1, gen_setindex_data4)
452+
for (b2, k1, k2, k3, k4) in chnls[1]
460453
# println(typeof(b2), " ", typeof(k1), " ", typeof(k2), " ", typeof(k3), " ", typeof(k4)) # uncomment to debug
461454
@check_bit_operation setindex!(b1, b2, k1, k2, k3, k4) BitArray{4}
462455
end

0 commit comments

Comments
 (0)