Skip to content

Commit a765cbe

Browse files
jpsamarooJamesWrigleydavidizzle
committed
Add streaming API
Co-authored-by: JamesWrigley <james@puiterwijk.org> Co-authored-by: davidizzle <davide.ferretti.j@gmail.com>
1 parent 577e179 commit a765cbe

File tree

15 files changed

+1362
-9
lines changed

15 files changed

+1362
-9
lines changed

Project.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ Graphs = "86223c79-3864-5bf0-83f7-82e725a168b6"
1111
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
1212
MacroTools = "1914dd2f-81c6-5fcd-8719-6d5c9610ff09"
1313
MemPool = "f9f48841-c794-520a-933b-121f7ba6ed94"
14-
Mmap = "a63ad114-7e13-5084-954f-fe012c677804"
1514
OnlineStats = "a15396b6-48d5-5d58-9928-6d29437db91e"
1615
PrecompileTools = "aea7be01-6a6a-4083-8856-8a6e6704d82a"
1716
Preferences = "21216c6a-2e73-6563-6e65-726566657250"

docs/make.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ makedocs(;
2222
"Task Spawning" => "task-spawning.md",
2323
"Data Management" => "data-management.md",
2424
"Distributed Arrays" => "darray.md",
25+
"Streaming Tasks" => "streaming.md",
2526
"Scopes" => "scopes.md",
2627
"Processors" => "processors.md",
2728
"Task Queues" => "task-queues.md",

docs/src/index.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,3 +394,37 @@ Dagger.@spawn copyto!(C, X)
394394

395395
In contrast to the previous example, here, the tasks are executed without argument annotations. As a result, there is a possibility of the `copyto!` task being executed before the `sort!` task, leading to unexpected results in the output array `C`.
396396

397+
## Quickstart: Streaming
398+
399+
Dagger.jl provides a streaming API that allows you to process data in a streaming fashion, where data is processed as it becomes available, rather than waiting for the entire dataset to be loaded into memory.
400+
401+
For more details: [Streaming](@ref)
402+
403+
### Syntax
404+
405+
The `Dagger.spawn_streaming()` function is used to create a streaming region,
406+
where tasks are executed continuously, processing data as it becomes available:
407+
408+
```julia
409+
# Open a file to write to on this worker
410+
f = Dagger.@mutable open("output.txt", "w")
411+
t = Dagger.spawn_streaming() do
412+
# Generate random numbers continuously
413+
val = Dagger.@spawn rand()
414+
# Write each random number to a file
415+
Dagger.@spawn (f, val) -> begin
416+
if val < 0.01
417+
# Finish streaming when the random number is less than 0.01
418+
Dagger.finish_stream()
419+
end
420+
println(f, val)
421+
end
422+
end
423+
# Wait for all values to be generated and written
424+
wait(t)
425+
```
426+
427+
The above example demonstrates a streaming region that generates random numbers
428+
continuously and writes each random number to a file. The streaming region is
429+
terminated when a random number less than 0.01 is generated, which is done by
430+
calling `Dagger.finish_stream()` (this exits the current streaming task).

docs/src/streaming.md

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
# Streaming
2+
3+
Dagger tasks have a limited lifetime - they are created, execute, finish, and
4+
are eventually destroyed when they're no longer needed. Thus, if one wants
5+
to run the same kind of computations over and over, one might re-create a
6+
similar set of tasks for each unit of data that needs processing.
7+
8+
This might be fine for computations which take a long time to run (thus
9+
dwarfing the cost of task creation, which is quite small), or when working with
10+
a limited set of data, but this approach is not great for doing lots of small
11+
computations on a large (or endless) amount of data. For example, processing
12+
image frames from a webcam, reacting to messages from a message bus, reading
13+
samples from a software radio, etc. All of these tasks are better suited to a
14+
"streaming" model of data processing, where data is simply piped into a
15+
continuously-running task (or DAG of tasks) forever, or until the data runs
16+
out.
17+
18+
Thankfully, if you have a problem which is best modeled as a streaming system
19+
of tasks, Dagger has you covered! Building on its support for
20+
[Task Queues](@ref), Dagger provides a means to convert an entire DAG of
21+
tasks into a streaming DAG, where data flows into and out of each task
22+
asynchronously, using the `spawn_streaming` function:
23+
24+
```julia
25+
Dagger.spawn_streaming() do # enters a streaming region
26+
vals = Dagger.@spawn rand()
27+
print_vals = Dagger.@spawn println(vals)
28+
end # exits the streaming region, and starts the DAG running
29+
```
30+
31+
In the above example, `vals` is a Dagger task which has been transformed to run
32+
in a streaming manner - instead of just calling `rand()` once and returning its
33+
result, it will re-run `rand()` endlessly, continuously producing new random
34+
values. In typical Dagger style, `print_vals` is a Dagger task which depends on
35+
`vals`, but in streaming form - it will continuously `println` the random
36+
values produced from `vals`. Both tasks will run forever, and will run
37+
efficiently, only doing the work necessary to generate, transfer, and consume
38+
values.
39+
40+
As the comments point out, `spawn_streaming` creates a streaming region, during
41+
which `vals` and `print_vals` are created and configured. Both tasks are halted
42+
until `spawn_streaming` returns, allowing large DAGs to be built all at once,
43+
without any task losing a single value. If desired, streaming regions can be
44+
connected, although some values might be lost while tasks are being connected:
45+
46+
```julia
47+
vals = Dagger.spawn_streaming() do
48+
Dagger.@spawn rand()
49+
end
50+
51+
# Some values might be generated by `vals` but thrown away
52+
# before `print_vals` is fully setup and connected to it
53+
54+
print_vals = Dagger.spawn_streaming() do
55+
Dagger.@spawn println(vals)
56+
end
57+
```
58+
59+
More complicated streaming DAGs can be easily constructed, without doing
60+
anything different. For example, we can generate multiple streams of random
61+
numbers, write them all to their own files, and print the combined results:
62+
63+
```julia
64+
Dagger.spawn_streaming() do
65+
all_vals = [Dagger.spawn(rand) for i in 1:4]
66+
all_vals_written = map(1:4) do i
67+
Dagger.spawn(all_vals[i]) do val
68+
open("results_$i.txt"; write=true, create=true, append=true) do io
69+
println(io, repr(val))
70+
end
71+
return val
72+
end
73+
end
74+
Dagger.spawn(all_vals_written...) do all_vals_written...
75+
vals_sum = sum(all_vals_written)
76+
println(vals_sum)
77+
end
78+
end
79+
```
80+
81+
If you want to stop the streaming DAG and tear it all down, you can call
82+
`Dagger.cancel!.(all_vals)` and `Dagger.cancel!.(all_vals_written)` to
83+
terminate each streaming task. In the future, a more convenient way to tear
84+
down a full DAG will be added; for now, each task must be cancelled individually.
85+
86+
Alternatively, tasks can stop themselves from the inside with
87+
`finish_stream`, optionally returning a value that can be `fetch`'d. Let's
88+
do this when our randomly-drawn number falls within some arbitrary range:
89+
90+
```julia
91+
vals = Dagger.spawn_streaming() do
92+
Dagger.spawn() do
93+
x = rand()
94+
if x < 0.001
95+
# That's good enough, let's be done
96+
return Dagger.finish_stream("Finished!")
97+
end
98+
return x
99+
end
100+
end
101+
fetch(vals)
102+
```
103+
104+
In this example, the call to `fetch` will hang (while random numbers continue
105+
to be drawn), until a drawn number is less than 0.001; at that point, `fetch`
106+
will return with `"Finished!"`, and the task `vals` will have terminated.

src/Dagger.jl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ if !isdefined(Base, :ScopedValues)
2121
else
2222
import Base.ScopedValues: ScopedValue, with
2323
end
24-
2524
import TaskLocalValues: TaskLocalValue
2625

2726
if !isdefined(Base, :get_extension)
@@ -78,6 +77,11 @@ include("sch/Sch.jl"); using .Sch
7877
# Data dependency task queue
7978
include("datadeps.jl")
8079

80+
# Streaming
81+
include("stream.jl")
82+
include("stream-buffers.jl")
83+
include("stream-transfer.jl")
84+
8185
# Array computations
8286
include("array/darray.jl")
8387
include("array/alloc.jl")

src/sch/Sch.jl

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -259,9 +259,11 @@ end
259259
Combine `SchedulerOptions` and `ThunkOptions` into a new `ThunkOptions`.
260260
"""
261261
function Base.merge(sopts::SchedulerOptions, topts::ThunkOptions)
262-
single = topts.single !== nothing ? topts.single : sopts.single
263-
allow_errors = topts.allow_errors !== nothing ? topts.allow_errors : sopts.allow_errors
264-
proclist = topts.proclist !== nothing ? topts.proclist : sopts.proclist
262+
select_option = (sopt, topt) -> isnothing(topt) ? sopt : topt
263+
264+
single = select_option(sopts.single, topts.single)
265+
allow_errors = select_option(sopts.allow_errors, topts.allow_errors)
266+
proclist = select_option(sopts.proclist, topts.proclist)
265267
ThunkOptions(single,
266268
proclist,
267269
topts.time_util,
@@ -1376,7 +1378,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
13761378
if unwrap_nested_exception(err) isa InvalidStateException || !isopen(return_queue)
13771379
@dagdebug thunk_id :execute "Return queue is closed, failing to put result" chan=return_queue exception=(err, catch_backtrace())
13781380
else
1379-
rethrow(err)
1381+
rethrow()
13801382
end
13811383
finally
13821384
# Ensure that any spawned tasks get cleaned up

src/sch/eager.jl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,13 @@ function eager_cleanup(state, uid)
124124
# N.B. cache and errored expire automatically
125125
delete!(state.thunk_dict, tid)
126126
end
127+
remotecall_wait(1, uid) do uid
128+
lock(Dagger.EAGER_THUNK_STREAMS) do global_streams
129+
if haskey(global_streams, uid)
130+
delete!(global_streams, uid)
131+
end
132+
end
133+
end
127134
end
128135

129136
function _find_thunk(e::Dagger.DTask)

src/sch/util.jl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ unwrap_nested_exception(err::RemoteException) =
3131
unwrap_nested_exception(err.captured)
3232
unwrap_nested_exception(err::DTaskFailedException) =
3333
unwrap_nested_exception(err.ex)
34+
unwrap_nested_exception(err::TaskFailedException) =
35+
unwrap_nested_exception(err.t.exception)
3436
unwrap_nested_exception(err) = err
3537

3638
"Gets a `NamedTuple` of options propagated by `thunk`."

src/stream-buffers.jl

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
"A process-local ring buffer."
2+
mutable struct ProcessRingBuffer{T}
3+
read_idx::Int
4+
write_idx::Int
5+
@atomic count::Int
6+
buffer::Vector{T}
7+
@atomic open::Bool
8+
function ProcessRingBuffer{T}(len::Int=1024) where T
9+
buffer = Vector{T}(undef, len)
10+
return new{T}(1, 1, 0, buffer, true)
11+
end
12+
end
13+
Base.isempty(rb::ProcessRingBuffer) = (@atomic rb.count) == 0
14+
isfull(rb::ProcessRingBuffer) = (@atomic rb.count) == length(rb.buffer)
15+
capacity(rb::ProcessRingBuffer) = length(rb.buffer)
16+
Base.length(rb::ProcessRingBuffer) = @atomic rb.count
17+
Base.isopen(rb::ProcessRingBuffer) = @atomic rb.open
18+
function Base.close(rb::ProcessRingBuffer)
19+
@atomic rb.open = false
20+
end
21+
function Base.put!(rb::ProcessRingBuffer{T}, x) where T
22+
while isfull(rb)
23+
yield()
24+
if !isopen(rb)
25+
throw(InvalidStateException("ProcessRingBuffer is closed", :closed))
26+
end
27+
task_may_cancel!(; must_force=true)
28+
end
29+
to_write_idx = mod1(rb.write_idx, length(rb.buffer))
30+
rb.buffer[to_write_idx] = convert(T, x)
31+
rb.write_idx += 1
32+
@atomic rb.count += 1
33+
end
34+
function Base.take!(rb::ProcessRingBuffer)
35+
while isempty(rb)
36+
yield()
37+
if !isopen(rb) && isempty(rb)
38+
throw(InvalidStateException("ProcessRingBuffer is closed", :closed))
39+
end
40+
if task_cancelled() && isempty(rb)
41+
# We respect a graceful cancellation only if the buffer is empty.
42+
# Otherwise, we may have values to continue communicating.
43+
task_may_cancel!()
44+
end
45+
task_may_cancel!(; must_force=true)
46+
end
47+
to_read_idx = rb.read_idx
48+
rb.read_idx += 1
49+
@atomic rb.count -= 1
50+
to_read_idx = mod1(to_read_idx, length(rb.buffer))
51+
return rb.buffer[to_read_idx]
52+
end
53+
54+
"""
55+
`take!()` all the elements from a buffer and put them in a `Vector`.
56+
"""
57+
function collect!(rb::ProcessRingBuffer{T}) where T
58+
output = Vector{T}(undef, rb.count)
59+
for i in 1:rb.count
60+
output[i] = take!(rb)
61+
end
62+
63+
return output
64+
end

src/stream-transfer.jl

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
struct RemoteChannelFetcher
2+
chan::RemoteChannel
3+
RemoteChannelFetcher() = new(RemoteChannel())
4+
end
5+
const _THEIR_TID = TaskLocalValue{Int}(()->0)
6+
function stream_push_values!(fetcher::RemoteChannelFetcher, T, our_store::StreamStore, their_stream::Stream, buffer)
7+
our_tid = STREAM_THUNK_ID[]
8+
our_uid = our_store.uid
9+
their_uid = their_stream.uid
10+
if _THEIR_TID[] == 0
11+
_THEIR_TID[] = remotecall_fetch(1) do
12+
lock(Sch.EAGER_ID_MAP) do id_map
13+
id_map[their_uid]
14+
end
15+
end
16+
end
17+
their_tid = _THEIR_TID[]
18+
@dagdebug our_tid :stream_push "taking output value: $our_tid -> $their_tid"
19+
value = try
20+
take!(buffer)
21+
catch
22+
close(fetcher.chan)
23+
rethrow()
24+
end
25+
@lock our_store.lock notify(our_store.lock)
26+
@dagdebug our_tid :stream_push "pushing output value: $our_tid -> $their_tid"
27+
try
28+
put!(fetcher.chan, value)
29+
catch err
30+
if err isa InvalidStateException && !isopen(fetcher.chan)
31+
@dagdebug our_tid :stream_push "channel closed: $our_tid -> $their_tid"
32+
throw(InterruptException())
33+
end
34+
# N.B. We don't close the buffer to allow for eventual reconnection
35+
rethrow()
36+
end
37+
@dagdebug our_tid :stream_push "finished pushing output value: $our_tid -> $their_tid"
38+
end
39+
function stream_pull_values!(fetcher::RemoteChannelFetcher, T, our_store::StreamStore, their_stream::Stream, buffer)
40+
our_tid = STREAM_THUNK_ID[]
41+
our_uid = our_store.uid
42+
their_uid = their_stream.uid
43+
if _THEIR_TID[] == 0
44+
_THEIR_TID[] = remotecall_fetch(1) do
45+
lock(Sch.EAGER_ID_MAP) do id_map
46+
id_map[their_uid]
47+
end
48+
end
49+
end
50+
their_tid = _THEIR_TID[]
51+
@dagdebug our_tid :stream_pull "pulling input value: $their_tid -> $our_tid"
52+
value = try
53+
take!(fetcher.chan)
54+
catch err
55+
if err isa InvalidStateException && !isopen(fetcher.chan)
56+
@dagdebug our_tid :stream_pull "channel closed: $their_tid -> $our_tid"
57+
throw(InterruptException())
58+
end
59+
# N.B. We don't close the buffer to allow for eventual reconnection
60+
rethrow()
61+
end
62+
@dagdebug our_tid :stream_pull "putting input value: $their_tid -> $our_tid"
63+
try
64+
put!(buffer, value)
65+
catch
66+
close(fetcher.chan)
67+
rethrow()
68+
end
69+
@lock our_store.lock notify(our_store.lock)
70+
@dagdebug our_tid :stream_pull "finished putting input value: $their_tid -> $our_tid"
71+
end

0 commit comments

Comments
 (0)