-
Notifications
You must be signed in to change notification settings - Fork 0
/
coroutine.go
392 lines (338 loc) · 8.67 KB
/
coroutine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
package async
import "path"
const (
doEnd = iota
doYield
doSwitch
)
const (
flagStale = 1 << iota
flagResumed
flagEnded
flagRecyclable
flagRecycled
)
// A Coroutine is an execution of code, similar to a goroutine but cooperative
// and stackless.
//
// A Coroutine is created with a function called [Task].
// A Coroutine's job is to end the Task.
// When an [Executor] spawns a Coroutine with a Task, it runs the Coroutine by
// calling the Task function with the Coroutine as the argument.
// The return value determines whether to end the Coroutine or to yield it
// so that it could resume later.
//
// In order for a Coroutine to resume, the Coroutine must watch at least one
// [Event] (e.g. [Signal], [State] and [Memo], etc.), when calling the Task
// function.
// A notification of such an Event resumes the Coroutine.
// When a Coroutine is resumed, the Executor runs the Coroutine again.
//
// A Coroutine can also switch to work on another Task function according to
// the return value of the Task function.
// A Coroutine can switch from one Task to another until a Task ends it.
type Coroutine struct {
executor *Executor
path string
task Task
flag uint8
deps map[Event]bool
inners []coroutineOrFunc
outer *Coroutine
}
type coroutineOrFunc struct {
co *Coroutine
f func()
}
func (e *Executor) newCoroutine() *Coroutine {
if co := e.pool.Get(); co != nil {
return co.(*Coroutine)
}
return new(Coroutine)
}
func (e *Executor) freeCoroutine(co *Coroutine) {
if co.flag&(flagRecyclable|flagRecycled) == flagRecyclable {
co.executor = nil
co.task = nil
co.flag |= flagRecycled
co.outer = nil
e.pool.Put(co)
}
}
func (co *Coroutine) init(e *Executor, p string, t Task) *Coroutine {
co.executor = e
co.path = p
co.task = t
co.flag = flagStale
return co
}
func (co *Coroutine) recyclable() *Coroutine {
co.flag |= flagRecyclable
return co
}
func (co *Coroutine) less(other *Coroutine) bool {
return co.path < other.path
}
func (co *Coroutine) resume() {
flag := co.flag
if flag&flagEnded != 0 {
return
}
if flag&flagResumed != 0 {
co.flag = flag | flagStale
return
}
co.flag = flag | flagStale | flagResumed
co.executor.resumeCoroutine(co)
}
func (e *Executor) runCoroutine(co *Coroutine) {
flag := co.flag
flag &^= flagResumed
co.flag = flag
if flag&flagEnded != 0 {
e.freeCoroutine(co)
return
}
if flag&flagStale == 0 {
return
}
e.mu.Unlock()
co.run()
e.mu.Lock()
}
func (co *Coroutine) run() {
{
deps := co.deps
for d := range deps {
deps[d] = false
}
}
var res Result
for {
co.clearInners()
co.flag &^= flagStale | flagEnded
res = co.task(co)
if res.switchTo != nil {
co.task = res.switchTo
}
if res.action != doSwitch {
break
}
co.clearDeps()
}
if res.action != doEnd {
deps := co.deps
for d, inUse := range deps {
if !inUse {
delete(deps, d)
d.removeListener(co)
}
}
}
if res.action == doEnd || len(co.deps) == 0 && len(co.inners) == 0 {
co.end()
}
}
func (co *Coroutine) end() {
if co.flag&flagEnded != 0 {
return
}
co.flag |= flagEnded
co.clearDeps()
co.clearInners()
if co.flag&flagResumed == 0 {
co.executor.freeCoroutine(co)
}
}
func (co *Coroutine) clearDeps() {
deps := co.deps
for d := range deps {
delete(deps, d)
d.removeListener(co)
}
}
func (co *Coroutine) clearInners() {
inners := co.inners
co.inners = inners[:0]
for i := len(inners) - 1; i >= 0; i-- {
switch v := inners[i]; {
case v.co != nil:
// v.co could have been ended and recycled.
// We need the following check to confirm that v.co is still an inner Coroutine of co.
if v.co.outer == co {
v.co.end()
}
case v.f != nil:
v.f()
}
}
clear(inners)
}
// Executor returns the [Executor] that spawned co.
//
// Since co can be recycled by an Executor, it is recommended to save
// the return value in a variable first.
func (co *Coroutine) Executor() *Executor {
return co.executor
}
// Path returns the path of co.
//
// Since co can be recycled by an Executor, it is recommended to save
// the return value in a variable first.
func (co *Coroutine) Path() string {
return co.path
}
// Watch watches some Events so that, when any of them notifies, co resumes.
func (co *Coroutine) Watch(s ...Event) {
deps := co.deps
if deps == nil {
deps = make(map[Event]bool)
co.deps = deps
}
for _, d := range s {
if _, ok := deps[d]; ok {
deps[d] = true
continue
}
deps[d] = true
d.addListener(co)
}
}
// Defer adds a function call when co resumes or ends, or when co is switching
// to work on another [Task].
func (co *Coroutine) Defer(f func()) {
co.inners = append(co.inners, coroutineOrFunc{f: f})
}
// Spawn creates an inner [Coroutine] to work on t, using the result of
// path.Join(co.Path(), p) as its path.
//
// Inner Coroutines are ended automatically when the outer one resumes or
// ends, or when the outer one is switching to work on another Task.
func (co *Coroutine) Spawn(p string, t Task) {
inner := co.executor.newCoroutine().init(co.executor, path.Join(co.path, p), t).recyclable()
inner.run()
if inner.flag&flagEnded == 0 {
inner.outer = co
co.inners = append(co.inners, coroutineOrFunc{co: inner})
}
}
// Result is the type of the return value of a [Task] function.
// A Result determines what next for a [Coroutine] to do after calling
// a Task function.
//
// A Result can be created by calling one of the following methods of
// Coroutine:
// - [Coroutine.End]: for ending a Coroutine;
// - [Coroutine.Await]: for yielding a Coroutine with additional Events to
// watch;
// - [Coroutine.Yield]: for yielding a Coroutine with another Task to which
// will be switched later when resuming;
// - [Coroutine.Switch]: for switching to another Task.
type Result struct {
action int
switchTo Task
}
// End returns a [Result] that will cause co to end or switch to work on
// another [Task] in a [Chain].
func (co *Coroutine) End() Result {
return Result{action: doEnd}
}
// Await returns a [Result] that will cause co to yield.
// Await also accepts additional Events to be awaited for.
func (co *Coroutine) Await(s ...Event) Result {
if len(s) != 0 {
co.Watch(s...)
}
return Result{action: doYield}
}
// Yield returns a [Result] that will cause co to yield.
// t becomes the current Task of co so that, when co is resumed, t is called
// instead.
func (co *Coroutine) Yield(t Task) Result {
if t == nil {
panic("async(Coroutine): undefined behavior: Yield(nil)")
}
return Result{action: doYield, switchTo: t}
}
// Switch returns a [Result] that will cause co to switch to work on t.
// co will be reset and t will be called immediately as the current Task of co.
func (co *Coroutine) Switch(t Task) Result {
if t == nil {
panic("async(Coroutine): undefined behavior: Switch(nil)")
}
return Result{action: doSwitch, switchTo: t}
}
// A Task is a piece of work that a [Coroutine] is given to do when it is
// spawned.
// The return value of a Task, a [Result], determines what next for a Coroutine
// to do.
//
// The argument co must not escape, because co can be recycled by an [Executor]
// when co ends.
type Task func(co *Coroutine) Result
// Chain returns a [Task] that will work on each of the provided Tasks
// in sequence.
// When one Task ends, Chain works on another.
func Chain(s ...Task) Task {
var t Task
return func(co *Coroutine) Result {
if t == nil {
if len(s) == 0 {
return co.End()
}
t, s = s[0], s[1:]
}
switch res := t(co); res.action {
case doEnd:
t = nil
return Result{action: doSwitch}
case doYield, doSwitch:
if res.switchTo != nil {
t = res.switchTo
}
return Result{action: res.action}
default:
panic("async: internal error: unknown action")
}
}
}
// Do returns a [Task] that calls f, and then ends.
func Do(f func()) Task {
return func(co *Coroutine) Result {
f()
return co.End()
}
}
// Never returns a [Task] that never ends.
// Tasks in a [Chain] after Never are never getting worked on.
func Never() Task {
return func(co *Coroutine) Result {
return co.Await()
}
}
// NoOperation returns a [Task] that ends without doing anything.
func NoOperation() Task {
return (*Coroutine).End
}
// Then returns a [Task] that first works on t, then switches to work on next
// after t ends.
//
// To chain multiple Tasks, use [Chain] function.
func (t Task) Then(next Task) Task {
if next == nil {
panic("async(Task): undefined behavior: Then(nil)")
}
return func(co *Coroutine) Result {
switch res := t(co); res.action {
case doEnd:
return Result{action: doSwitch, switchTo: next}
case doYield, doSwitch:
if res.switchTo != nil {
t = res.switchTo
}
return Result{action: res.action}
default:
panic("async: internal error: unknown action")
}
}
}