Skip to content

Commit

Permalink
Allow sync task.{wait,yield,poll} and {stream,future}.{read,write}
Browse files Browse the repository at this point in the history
  • Loading branch information
lukewagner committed Oct 25, 2024
1 parent 8159172 commit fd9fbf1
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 152 deletions.
6 changes: 3 additions & 3 deletions design/mvp/Binary.md
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,9 @@ canon ::= 0x00 0x00 f:<core:funcidx> opts:<opts> ft:<typeidx> => (canon lift
| 0x06 => (canon thread.hw_concurrency (core func)) 🧵
| 0x08 => (canon task.backpressure (core func)) 🔀
| 0x09 ft:<core:typeidx> => (canon task.return ft (core func)) 🔀
| 0x0a m:<core:memdix> => (canon task.wait (memory m) (core func)) 🔀
| 0x0b m:<core:memidx> => (canon task.poll (memory m) (core func)) 🔀
| 0x0c => (canon task.yield (core func)) 🔀
| 0x0a async?:<async>? m:<core:memdix> => (canon task.wait async? (memory m) (core func)) 🔀
| 0x0b async?:<async>? m:<core:memidx> => (canon task.poll async? (memory m) (core func)) 🔀
| 0x0c async?:<async>? => (canon task.yield async? (core func)) 🔀
| 0x0d => (canon waitable.drop (core func)) 🔀
| 0x0e t:<typeidx> => (canon stream.new t (core func)) 🔀
| 0x0f t:<typeidx> opts:<opts> => (canon stream.read t opts (core func)) 🔀
Expand Down
153 changes: 85 additions & 68 deletions design/mvp/CanonicalABI.md
Original file line number Diff line number Diff line change
Expand Up @@ -528,21 +528,34 @@ The conditions in `may_enter` ensure two invariants:

The `wait_on` method, called by `wait` and `yield_` below, blocks the
current task until the given future is resolved, allowing other tasks to make
progress in the meantime. While blocked, another asynchronous task can make a
synchronous import call via `call_sync`, in which case, to preserve
synchronicity, `wait_on` must wait until the synchronous import call is
finished (signalled by `interrupt` being re-set).
```python
async def wait_on(self, f):
progress in the meantime. If called with `sync` set, `interruptible` is
cleared to ensure that no other tasks are allowed to start or resume,
emulating a traditional synchronous system call. If `sync` is not set, then
it's possible that between blocking and resuming, some other task executed and
cleared `interruptible`, and thus `wait_on` must wait until `interruptible` is
set again. If `interruptible` is already clear when `wait_on` is called, then
it is already part of a synchronous call and so there's nothing extra to do.
```python
async def wait_on(self, sync, f):
self.maybe_start_pending_task()
if self.inst.interruptible.is_set():
if sync:
self.inst.interruptible.clear()
v = await self.on_block(f)
while not self.inst.interruptible.is_set():
await self.on_block(self.inst.interruptible.wait())
if sync:
self.inst.interruptible.set()
else:
while not self.inst.interruptible.is_set():
await self.on_block(self.inst.interruptible.wait())
else:
v = await self.on_block(f)
return v
```

A task can also make a synchronous call (to a `canon` built-in or another
component) via `call_sync` which, like `wait_on`, clears the `interruptible`
flag to block new tasks from starting or resuming.
```python
async def call_sync(self, callee, *args):
if self.inst.interruptible.is_set():
self.inst.interruptible.clear()
Expand All @@ -551,19 +564,15 @@ finished (signalled by `interrupt` being re-set).
else:
await callee(*args, self.on_block)
```
If `wait_on` or `call_sync` are called when `interruptible` is *initially*
clear, then the current task must have been created for a synchronously-lifted
export call and so there are no other tasks to worry about and thus `wait_on`
*must not* wait for `interrupt` to be re-set (which won't happen until the
current task finishes via `exit`, defined below).

While a task is running, it may call `wait` (via `canon task.wait` or when
using a `callback`, by returning to the event loop) to learn about progress
made by async subtasks which are reported to this task by `notify`.
made by async subtasks, streams or futures which are reported to this task by
`notify`.Queue`. (The definition of `wait_on`, used by `wait` here, is next.)
```python
async def wait(self) -> EventTuple:
async def wait(self, sync) -> EventTuple:
while True:
await self.wait_on(self.has_events.wait())
await self.wait_on(sync, self.has_events.wait())
if (e := self.maybe_next_event()):
return e

Expand All @@ -585,19 +594,20 @@ flexibility allows multiple redundant events to be collapsed into one (e.g.,
when a `Subtask` advances `CallState` multiple times before the event enqueued
by the initial state change is delivered) and also for events to be
retroactively removed (e.g., when a `stream.cancel-read` "steals" a pending
`STREAM_READ` event that was enqueued but not yet delivered). Although this
Python code represents `events` as a list of closures, an optimizing
implementation should be able to avoid dynamically allocating this list and
instead represent `events` as a linked list embedded in the elements of the
`waitables` table (noting that, by design, any given `waitables` element can be
in the `events` list at most once).
`STREAM_READ` event that was enqueued but not yet delivered).

Although this Python code represents `events` as an `asyncio.Queue` of
closures, an optimizing implementation should be able to avoid dynamically
allocating anything and instead represent `events` as a linked list embedded
in the elements of the `waitables` table (noting that, by design, any given
`waitables` element can be in the `events` list at most once).

A task may also cooperatively yield (via `canon task.yield`), allowing the
runtime to switch execution to another task without having to wait for any
external I/O (as emulated in the Python code by awaiting `sleep(0)`:
```python
async def yield_(self):
await self.wait_on(asyncio.sleep(0))
async def yield_(self, sync):
await self.wait_on(sync, asyncio.sleep(0))
```

Putting these together, a task may also poll (via `canon task.poll`) for an
Expand All @@ -606,8 +616,8 @@ Importantly, `poll` starts by yielding execution (to avoid unintentionally
starving other tasks) which means that the code calling `task.poll` must
assume other tasks can execute, just like with `task.wait`.
```python
async def poll(self) -> Optional[EventTuple]:
await self.yield_()
async def poll(self, sync) -> Optional[EventTuple]:
await self.yield_(sync)
return self.maybe_next_event()
```

Expand Down Expand Up @@ -2596,10 +2606,10 @@ async def canon_lift(opts, inst, ft, callee, caller, on_start, on_return, on_blo
is_yield = bool(packed_ctx & 1)
ctx = packed_ctx & ~1
if is_yield:
await task.yield_()
await task.yield_(sync = False)
event, p1, p2 = (EventCode.YIELDED, 0, 0)
else:
event, p1, p2 = await task.wait()
event, p1, p2 = await task.wait(sync = False)
[packed_ctx] = await call_and_trap_on_throw(opts.callback, task, [ctx, event, p1, p2])
task.exit()
```
Expand Down Expand Up @@ -2865,7 +2875,7 @@ required here.

For a canonical definition:
```wasm
(canon task.wait (core func $f))
(canon task.wait $async? (memory $mem) (core func $f))
```
validation specifies:
* `$f` is given type `(func (param i32) (result i32))`
Expand All @@ -2874,30 +2884,31 @@ Calling `$f` waits for progress to be made in a subtask of the current task,
returning the event (which is currently simply a `CallState` value) and
writing the subtask index as an outparam:
```python
async def canon_task_wait(opts, task, ptr):
async def canon_task_wait(sync, mem, task, ptr):
trap_if(not task.inst.may_leave)
trap_if(task.opts.callback is not None)
event, p1, p2 = await task.wait()
cx = LiftLowerContext(opts, None, None)
event, p1, p2 = await task.wait(sync)
cx = LiftLowerContext(CanonicalOptions(memory = mem), None, None)
store(cx, p1, U32Type(), ptr)
store(cx, p2, U32Type(), ptr + 4)
return [event]
```
The `trap_if` ensures that, when a component uses a `callback` all events flow
through the event loop at the base of the stack.
If `async` is not set, no other tasks may execute during `task.wait`, which
can be useful for producer toolchains in situations where interleaving is not
supported. However, this is generally worse for concurrency and thus producer
toolchains should set `async` when possible. When `$async` is set, `task.wait`
will only block the current `Task`, allowing other tasks to start or resume.

Note that `task.wait` will only block the current `Task`, allowing other tasks
to run. Note also that `task.wait` can be called from a synchronously-lifted
export so that even synchronous code can make concurrent import calls. In these
synchronous cases, though, the automatic backpressure (applied by `Task.enter`)
will ensure there is only ever at most once synchronously-lifted task executing
in a component instance at a time.
`task.wait` can be called from a synchronously-lifted export so that even
synchronous code can make concurrent import calls. In these synchronous cases,
though, the automatic backpressure (applied by `Task.enter`) will ensure there
is only ever at most once synchronously-lifted task executing in a component
instance at a time.

### 🔀 `canon task.poll`

For a canonical definition:
```wasm
(canon task.poll (core func $f))
(canon task.poll $async? (memory $mem) (core func $f))
```
validation specifies:
* `$f` is given type `(func (param i32) (result i32))`
Expand All @@ -2906,36 +2917,38 @@ Calling `$f` does a non-blocking check for whether an event is already
available, returning whether or not there was such an event as a boolean and,
if there was an event, storing the `i32` event and payloads as outparams.
```python
async def canon_task_poll(opts, task, ptr):
async def canon_task_poll(sync, mem, task, ptr):
trap_if(not task.inst.may_leave)
ret = await task.poll()
ret = await task.poll(sync)
if ret is None:
return [0]
cx = LiftLowerContext(opts, None, None)
cx = LiftLowerContext(CanonicalOptions(memory = mem), None, None)
store(cx, ret, TupleType([U32Type(), U32Type(), U32Type()]), ptr)
return [1]
```
Note that the `await` of `task.poll` indicates that `task.poll` can yield to
other tasks (in this or other components) as part of polling for an event.
When `async` is set, `task.poll` can yield to other tasks (in this or other
components) as part of polling for an event.

### 🔀 `canon task.yield`

For a canonical definition:
```wasm
(canon task.yield (core func $f))
(canon task.yield $async? (core func $f))
```
validation specifies:
* `$f` is given type `(func)`

Calling `$f` calls `Task.yield_`, trapping if called when there is a `callback`.
(When there is a callback, yielding is achieved by returning with the LSB set.)
Calling `$f` calls `Task.yield_` to allow other tasks to execute:
```python
async def canon_task_yield(task):
async def canon_task_yield(sync, task):
trap_if(not task.inst.may_leave)
trap_if(task.opts.callback is not None)
await task.yield_()
await task.yield_(sync)
return []
```
If `async` is set, no other tasks *in the same component instance* can
execute, however tasks in *other* component instances may execute. This allows
a long-running task in one component to avoid starving other components
without needing support full reentrancy.

### 🔀 `canon {stream,future}.new`

Expand Down Expand Up @@ -3022,21 +3035,25 @@ async def async_copy(HandleT, BufferT, t, opts, event_code, task, i, ptr, n):
if h.stream.closed():
flat_results = [CLOSED]
else:
async def do_copy(on_block):
await h.copy(buffer, on_block)
def copy_event():
if h.copying_buffer is buffer:
h.copying_buffer = None
return (event_code, i, pack_async_copy_result(buffer, h))
else:
return None
h.call.task().notify(copy_event)
match await call_and_handle_blocking(do_copy):
case Blocked():
h.copying_buffer = buffer
flat_results = [BLOCKED]
case Returned():
flat_results = [pack_async_copy_result(buffer, h)]
if opts.sync:
await task.call_sync(h.copy, buffer)
flat_results = [pack_async_copy_result(buffer, h)]
else:
async def do_copy(on_block):
await h.copy(buffer, on_block)
def copy_event():
if h.copying_buffer is buffer:
h.copying_buffer = None
return (event_code, i, pack_async_copy_result(buffer, h))
else:
return None
h.call.task().notify(copy_event)
match await call_and_handle_blocking(do_copy):
case Blocked():
h.copying_buffer = buffer
flat_results = [BLOCKED]
case Returned():
flat_results = [pack_async_copy_result(buffer, h)]
return flat_results
```
The trap if `not h.call` prevents `write`s on the writable end of streams or
Expand Down
6 changes: 3 additions & 3 deletions design/mvp/Explainer.md
Original file line number Diff line number Diff line change
Expand Up @@ -1369,9 +1369,9 @@ canon ::= ...
| (canon resource.rep <typeidx> (core func <id>?))
| (canon task.backpressure (core func <id>?)) 🔀
| (canon task.return <core:typeidx> (core func <id>?)) 🔀
| (canon task.wait (memory <core:memidx>) (core func <id>?)) 🔀
| (canon task.poll (memory <core:memidx>) (core func <id>?)) 🔀
| (canon task.yield (core func <id>?)) 🔀
| (canon task.wait async? (memory <core:memidx>) (core func <id>?)) 🔀
| (canon task.poll async? (memory <core:memidx>) (core func <id>?)) 🔀
| (canon task.yield async? (core func <id>?)) 🔀
| (canon stream.new <typeidx> (core func <id>?)) 🔀
| (canon stream.read <typeidx> <canonopt>* (core func <id>?)) 🔀
| (canon stream.write <typeidx> <canonopt>* (core func <id>?)) 🔀
Expand Down
Loading

0 comments on commit fd9fbf1

Please sign in to comment.