1
1
struct TapedTaskException
2
- exc
2
+ exc:: Exception
3
+ backtrace
3
4
end
4
5
5
6
struct TapedTask
6
7
task:: Task
7
8
tf:: TapedFunction
8
- counter:: Ref{Int}
9
9
produce_ch:: Channel{Any}
10
10
consume_ch:: Channel{Int}
11
11
produced_val:: Vector{Any}
12
12
13
13
function TapedTask (
14
- t:: Task , tf:: TapedFunction , counter, pch:: Channel{Any} , cch:: Channel{Int} )
15
- new (t, tf, counter, pch, cch, Any[])
14
+ t:: Task , tf:: TapedFunction , pch:: Channel{Any} , cch:: Channel{Int} )
15
+ new (t, tf, pch, cch, Any[])
16
16
end
17
17
end
18
18
19
19
function TapedTask (tf:: TapedFunction , args... )
20
20
tf. owner != nothing && error (" TapedFunction is owned to another task." )
21
- # dry_run(tf)
22
21
isempty (tf. tape) && tf (args... )
23
- counter = Ref {Int} (1 )
24
22
produce_ch = Channel ()
25
23
consume_ch = Channel {Int} ()
26
24
task = @task try
27
- step_in (tf, counter , args)
25
+ step_in (tf. tape , args)
28
26
catch e
29
- put! (produce_ch, TapedTaskException (e))
30
- # @error "TapedTask Error: " exception=(e, catch_backtrace())
27
+ bt = catch_backtrace ()
28
+ put! (produce_ch, TapedTaskException (e, bt))
29
+ # @error "TapedTask Error: " exception=(e, bt)
31
30
rethrow ()
32
31
finally
33
32
@static if VERSION >= v " 1.4"
@@ -40,7 +39,7 @@ function TapedTask(tf::TapedFunction, args...)
40
39
close (produce_ch)
41
40
close (consume_ch)
42
41
end
43
- t = TapedTask (task, tf, counter, produce_ch, consume_ch)
42
+ t = TapedTask (task, tf, produce_ch, consume_ch)
44
43
task. storage === nothing && (task. storage = IdDict ())
45
44
task. storage[:tapedtask ] = t
46
45
tf. owner = t
@@ -53,25 +52,31 @@ TapedTask(f, args...) = TapedTask(TapedFunction(f, arity=length(args)), args...)
53
52
TapedTask (t:: TapedTask , args... ) = TapedTask (func (t), args... )
54
53
func (t:: TapedTask ) = t. tf. func
55
54
56
- function step_in (tf:: TapedFunction , counter:: Ref{Int} , args)
57
- len = length (tf. tape)
58
- if (counter[] <= 1 && length (args) > 0 )
55
+
56
+ function step_in (t:: Tape , args)
57
+ len = length (t)
58
+ if (t. counter <= 1 && length (args) > 0 )
59
59
input = map (box, args)
60
- tf . tape [1 ]. input = input
60
+ t [1 ]. input = input
61
61
end
62
- while counter[] <= len
63
- tf . tape[ counter[] ]()
62
+ while t . counter <= len
63
+ t[t . counter]()
64
64
# produce and wait after an instruction is done
65
- ttask = tf . owner
65
+ ttask = t . owner . owner
66
66
if length (ttask. produced_val) > 0
67
67
val = pop! (ttask. produced_val)
68
68
put! (ttask. produce_ch, val)
69
69
take! (ttask. consume_ch) # wait for next consumer
70
70
end
71
- counter[] += 1
71
+ increase_counter! (t)
72
72
end
73
73
end
74
74
75
+ function next_step! (t:: TapedTask )
76
+ increase_counter! (t. tf. tape)
77
+ return t
78
+ end
79
+
75
80
#=
76
81
# ** Approach (A) to implement `produce`:
77
82
# Make`produce` a standalone instturction. This approach does NOT
@@ -186,18 +191,21 @@ function copy_box(old_box::Box{T}, roster::Dict{UInt64, Any}) where T
186
191
end
187
192
copy_box (o, roster:: Dict{UInt64, Any} ) = o
188
193
189
- function Base. copy (t:: Tape )
194
+ function Base. copy (x:: Instruction , on_tape:: Tape , roster:: Dict{UInt64, Any} )
195
+ input = map (x. input) do ob
196
+ copy_box (ob, roster)
197
+ end
198
+ output = copy_box (x. output, roster)
199
+ Instruction (x. fun, input, output, on_tape)
200
+ end
201
+
202
+ function Base. copy (t:: Tape , roster:: Dict{UInt64, Any} )
190
203
old_data = t. tape
191
- new_data = Vector {Instruction } ()
192
- new_tape = Tape (new_data, t. owner)
204
+ new_data = Vector {AbstractInstruction } ()
205
+ new_tape = Tape (new_data, t. counter, t . owner)
193
206
194
- roster = Dict {UInt64, Any} ()
195
207
for x in old_data
196
- input = map (x. input) do ob
197
- copy_box (ob, roster)
198
- end
199
- output = copy_box (x. output, roster)
200
- new_ins = Instruction (x. fun, input, output, new_tape)
208
+ new_ins = copy (x, new_tape, roster)
201
209
push! (new_data, new_ins)
202
210
end
203
211
207
215
function Base. copy (tf:: TapedFunction )
208
216
new_tf = TapedFunction (tf. func; arity= tf. arity)
209
217
new_tf. ir = tf. ir
210
- new_tape = copy (tf. tape)
211
- new_tape. owner = new_tf
218
+ roster = Dict {UInt64, Any} ()
219
+ new_tape = copy (tf. tape, roster)
220
+ setowner! (new_tape, new_tf)
212
221
new_tf. tape = new_tape
213
222
return new_tf
214
223
end
@@ -217,6 +226,8 @@ function Base.copy(t::TapedTask)
217
226
# t.counter[] <= 1 && error("Can't copy a TapedTask which is not running.")
218
227
tf = copy (t. tf)
219
228
new_t = TapedTask (tf)
220
- new_t. counter[] = t. counter[] + 1
229
+ new_t. task. storage = copy (t. task. storage)
230
+ new_t. task. storage[:tapedtask ] = new_t
231
+ next_step! (new_t)
221
232
return new_t
222
233
end
0 commit comments