Skip to content

Commit

Permalink
Add <typeidx> to {stream,future}.{read,write}
Browse files Browse the repository at this point in the history
  • Loading branch information
lukewagner committed Oct 22, 2024
1 parent 50cf971 commit 924f180
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 70 deletions.
8 changes: 4 additions & 4 deletions design/mvp/Binary.md
Original file line number Diff line number Diff line change
Expand Up @@ -294,13 +294,13 @@ canon ::= 0x00 0x00 f:<core:funcidx> opts:<opts> ft:<typeidx> => (canon lift
| 0x0c => (canon task.yield (core func)) 🔀
| 0x0d => (canon waitable.drop (core func)) 🔀
| 0x0e t:<typeidx> => (canon stream.new t (core func)) 🔀
| 0x0f => (canon stream.read (core func)) 🔀
| 0x10 => (canon stream.write (core func)) 🔀
| 0x0f t:<typeidx> => (canon stream.read t (core func)) 🔀
| 0x10 t:<typeidx> => (canon stream.write t (core func)) 🔀
| 0x11 async?:<async?> => (canon stream.cancel-read async? (core func)) 🔀
| 0x12 async?:<async?> => (canon stream.cancel-write async? (core func)) 🔀
| 0x13 t:<typeidx> => (canon future.new t (core func)) 🔀
| 0x14 => (canon future.read (core func)) 🔀
| 0x15 => (canon future.write (core func)) 🔀
| 0x14 t:<typeidx> => (canon future.read t (core func)) 🔀
| 0x15 t:<typeidx> => (canon future.write t (core func)) 🔀
| 0x16 async?:<async?> => (canon future.cancel-read async? (core func)) 🔀
| 0x17 async?:<async?> => (canon future.cancel-write async? (core func)) 🔀
async? ::= 0x00 =>
Expand Down
21 changes: 11 additions & 10 deletions design/mvp/CanonicalABI.md
Original file line number Diff line number Diff line change
Expand Up @@ -2968,29 +2968,30 @@ likelihood of deadlock), there is no synchronous option for `read` or `write`.
The actual copy happens via polymorphic dispatch to `copy`, which has been
defined above by the 4 `{Readable,Writable}{Stream,Future}Handle` types:
```python
async def canon_stream_read(task, i, ptr, n):
return await async_copy(ReadableStreamHandle, WritableBufferGuestImpl,
async def canon_stream_read(t, task, i, ptr, n):
return await async_copy(ReadableStreamHandle, WritableBufferGuestImpl, t,
EventCode.STREAM_READ, task, i, ptr, n)

async def canon_stream_write(task, i, ptr, n):
return await async_copy(WritableStreamHandle, ReadableBufferGuestImpl,
async def canon_stream_write(t, task, i, ptr, n):
return await async_copy(WritableStreamHandle, ReadableBufferGuestImpl, t,
EventCode.STREAM_WRITE, task, i, ptr, n)

async def canon_future_read(task, i, ptr):
return await async_copy(ReadableFutureHandle, WritableBufferGuestImpl,
async def canon_future_read(t, task, i, ptr):
return await async_copy(ReadableFutureHandle, WritableBufferGuestImpl, t,
EventCode.FUTURE_READ, task, i, ptr, 1)

async def canon_future_write(task, i, ptr):
return await async_copy(WritableFutureHandle, ReadableBufferGuestImpl,
async def canon_future_write(t, task, i, ptr):
return await async_copy(WritableFutureHandle, ReadableBufferGuestImpl, t,
EventCode.FUTURE_WRITE, task, i, ptr, 1)

async def async_copy(HandleT, BufferT, event_code, task, i, ptr, n):
async def async_copy(HandleT, BufferT, t, event_code, task, i, ptr, n):
trap_if(not task.inst.may_leave)
h = task.inst.waitables.get(i)
trap_if(not isinstance(h, HandleT))
trap_if(h.t != t)
trap_if(not h.cx)
trap_if(h.copying_buffer)
buffer = BufferT(h.cx, h.t, ptr, n)
buffer = BufferT(h.cx, t, ptr, n)
if h.stream.closed():
flat_results = [CLOSED]
else:
Expand Down
8 changes: 4 additions & 4 deletions design/mvp/Explainer.md
Original file line number Diff line number Diff line change
Expand Up @@ -1373,13 +1373,13 @@ canon ::= ...
| (canon task.poll (memory <core:memidx>) (core func <id>?)) 🔀
| (canon task.yield (core func <id>?)) 🔀
| (canon stream.new <typeidx> (core func <id>?)) 🔀
| (canon stream.read (core func <id>?)) 🔀
| (canon stream.write (core func <id>?)) 🔀
| (canon stream.read <typeidx> (core func <id>?)) 🔀
| (canon stream.write <typeidx> (core func <id>?)) 🔀
| (canon stream.cancel-read async? (core func <id>?)) 🔀
| (canon stream.cancel-write async? (core func <id>?)) 🔀
| (canon future.new <typeidx> (core func <id>?)) 🔀
| (canon future.read (core func <id>?)) 🔀
| (canon future.write (core func <id>?)) 🔀
| (canon future.read <typeidx> (core func <id>?)) 🔀
| (canon future.write <typeidx> (core func <id>?)) 🔀
| (canon future.cancel-read async? (core func <id>?)) 🔀
| (canon future.cancel-write async? (core func <id>?)) 🔀
| (canon waitable.drop (core func <id>?)) 🔀
Expand Down
21 changes: 11 additions & 10 deletions design/mvp/canonical-abi/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1855,29 +1855,30 @@ async def canon_future_new(t, task):

### 🔀 `canon {stream,future}.{read,write}`

async def canon_stream_read(task, i, ptr, n):
return await async_copy(ReadableStreamHandle, WritableBufferGuestImpl,
async def canon_stream_read(t, task, i, ptr, n):
return await async_copy(ReadableStreamHandle, WritableBufferGuestImpl, t,
EventCode.STREAM_READ, task, i, ptr, n)

async def canon_stream_write(task, i, ptr, n):
return await async_copy(WritableStreamHandle, ReadableBufferGuestImpl,
async def canon_stream_write(t, task, i, ptr, n):
return await async_copy(WritableStreamHandle, ReadableBufferGuestImpl, t,
EventCode.STREAM_WRITE, task, i, ptr, n)

async def canon_future_read(task, i, ptr):
return await async_copy(ReadableFutureHandle, WritableBufferGuestImpl,
async def canon_future_read(t, task, i, ptr):
return await async_copy(ReadableFutureHandle, WritableBufferGuestImpl, t,
EventCode.FUTURE_READ, task, i, ptr, 1)

async def canon_future_write(task, i, ptr):
return await async_copy(WritableFutureHandle, ReadableBufferGuestImpl,
async def canon_future_write(t, task, i, ptr):
return await async_copy(WritableFutureHandle, ReadableBufferGuestImpl, t,
EventCode.FUTURE_WRITE, task, i, ptr, 1)

async def async_copy(HandleT, BufferT, event_code, task, i, ptr, n):
async def async_copy(HandleT, BufferT, t, event_code, task, i, ptr, n):
trap_if(not task.inst.may_leave)
h = task.inst.waitables.get(i)
trap_if(not isinstance(h, HandleT))
trap_if(h.t != t)
trap_if(not h.cx)
trap_if(h.copying_buffer)
buffer = BufferT(h.cx, h.t, ptr, n)
buffer = BufferT(h.cx, t, ptr, n)
if h.stream.closed():
flat_results = [CLOSED]
else:
Expand Down
84 changes: 42 additions & 42 deletions design/mvp/canonical-abi/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1047,30 +1047,30 @@ async def core_func(task, args):
assert(rsi1 == 1)
[wsi1] = await canon_stream_new(U8Type(), task)
[] = await canon_task_return(task, CoreFuncType(['i32'],[]), [wsi1])
[ret] = await canon_stream_read(task, rsi1, 0, 4)
[ret] = await canon_stream_read(U8Type(), task, rsi1, 0, 4)
assert(ret == 4)
assert(mem[0:4] == b'\x01\x02\x03\x04')
[wsi2] = await canon_stream_new(U8Type(), task)
retp = 12
[ret] = await canon_lower(opts, ft, host_import, task, [wsi2, retp])
assert(ret == 0)
rsi2 = mem[retp]
[ret] = await canon_stream_write(task, wsi2, 0, 4)
[ret] = await canon_stream_write(U8Type(), task, wsi2, 0, 4)
assert(ret == 4)
[ret] = await canon_stream_read(task, rsi2, 0, 4)
[ret] = await canon_stream_read(U8Type(), task, rsi2, 0, 4)
assert(ret == 4)
[ret] = await canon_stream_write(task, wsi1, 0, 4)
[ret] = await canon_stream_write(U8Type(), task, wsi1, 0, 4)
assert(ret == 4)
[ret] = await canon_stream_read(task, rsi1, 0, 4)
[ret] = await canon_stream_read(U8Type(), task, rsi1, 0, 4)
assert(ret == 4)
[ret] = await canon_stream_read(task, rsi1, 0, 4)
[ret] = await canon_stream_read(U8Type(), task, rsi1, 0, 4)
assert(ret == definitions.CLOSED)
assert(mem[0:4] == b'\x05\x06\x07\x08')
[ret] = await canon_stream_write(task, wsi2, 0, 4)
[ret] = await canon_stream_write(U8Type(), task, wsi2, 0, 4)
assert(ret == 4)
[ret] = await canon_stream_read(task, rsi2, 0, 4)
[ret] = await canon_stream_read(U8Type(), task, rsi2, 0, 4)
assert(ret == 4)
[ret] = await canon_stream_write(task, wsi1, 0, 4)
[ret] = await canon_stream_write(U8Type(), task, wsi1, 0, 4)
assert(ret == 4)
[] = await canon_waitable_drop(task, rsi1)
[] = await canon_waitable_drop(task, rsi2)
Expand Down Expand Up @@ -1120,7 +1120,7 @@ async def core_func(task, args):
assert(rsi1 == 1)
[wsi1] = await canon_stream_new(U8Type(), task)
[] = await canon_task_return(task, CoreFuncType(['i32'],[]), [wsi1])
[ret] = await canon_stream_read(task, rsi1, 0, 4)
[ret] = await canon_stream_read(U8Type(), task, rsi1, 0, 4)
assert(ret == definitions.BLOCKED)
src_stream.write([1,2,3,4])
event, p1, p2 = await task.wait()
Expand All @@ -1135,16 +1135,16 @@ async def core_func(task, args):
assert(state == CallState.RETURNED)
rsi2 = mem[16]
assert(rsi2 == 4)
[ret] = await canon_stream_write(task, wsi2, 0, 4)
[ret] = await canon_stream_write(U8Type(), task, wsi2, 0, 4)
assert(ret == definitions.BLOCKED)
host_import_incoming.set_remain(100)
event, p1, p2 = await task.wait()
assert(event == EventCode.STREAM_WRITE)
assert(p1 == wsi2)
assert(p2 == 4)
[ret] = await canon_stream_read(task, rsi2, 0, 4)
[ret] = await canon_stream_read(U8Type(), task, rsi2, 0, 4)
assert(ret == 4)
[ret] = await canon_stream_write(task, wsi1, 0, 4)
[ret] = await canon_stream_write(U8Type(), task, wsi1, 0, 4)
assert(ret == definitions.BLOCKED)
dst_stream.set_remain(100)
event, p1, p2 = await task.wait()
Expand All @@ -1153,16 +1153,16 @@ async def core_func(task, args):
assert(p2 == 4)
src_stream.write([5,6,7,8])
src_stream.destroy_once_empty()
[ret] = await canon_stream_read(task, rsi1, 0, 4)
[ret] = await canon_stream_read(U8Type(), task, rsi1, 0, 4)
assert(ret == 4)
[ret] = await canon_stream_read(task, rsi1, 0, 4)
[ret] = await canon_stream_read(U8Type(), task, rsi1, 0, 4)
assert(ret == definitions.CLOSED)
[] = await canon_waitable_drop(task, rsi1)
assert(mem[0:4] == b'\x05\x06\x07\x08')
[ret] = await canon_stream_write(task, wsi2, 0, 4)
[ret] = await canon_stream_write(U8Type(), task, wsi2, 0, 4)
assert(ret == 4)
[] = await canon_waitable_drop(task, wsi2)
[ret] = await canon_stream_read(task, rsi2, 0, 4)
[ret] = await canon_stream_read(U8Type(), task, rsi2, 0, 4)
assert(ret == definitions.BLOCKED)
event, p1, p2 = await task.wait()
assert(event == EventCode.CALL_DONE)
Expand All @@ -1172,11 +1172,11 @@ async def core_func(task, args):
assert(event == EventCode.STREAM_READ)
assert(p1 == rsi2)
assert(p2 == 4)
[ret] = await canon_stream_read(task, rsi2, 0, 4)
[ret] = await canon_stream_read(U8Type(), task, rsi2, 0, 4)
assert(ret == definitions.CLOSED)
[] = await canon_waitable_drop(task, rsi2)
[] = await canon_waitable_drop(task, subi)
[ret] = await canon_stream_write(task, wsi1, 0, 4)
[ret] = await canon_stream_write(U8Type(), task, wsi1, 0, 4)
assert(ret == 4)
[] = await canon_waitable_drop(task, wsi1)
return []
Expand Down Expand Up @@ -1264,13 +1264,13 @@ async def core_func(task, args):
assert(ret == 0)
rsi = mem[retp]
assert(rsi == 1)
[ret] = await canon_stream_read(task, rsi, 0, 4)
[ret] = await canon_stream_read(U8Type(), task, rsi, 0, 4)
assert(ret == 2)
assert(mem[0:2] == b'\x01\x02')
[ret] = await canon_stream_read(task, rsi, 0, 4)
[ret] = await canon_stream_read(U8Type(), task, rsi, 0, 4)
assert(ret == 2)
assert(mem[0:2] == b'\x03\x04')
[ret] = await canon_stream_read(task, rsi, 0, 4)
[ret] = await canon_stream_read(U8Type(), task, rsi, 0, 4)
assert(ret == definitions.BLOCKED)
src.write([5,6])
event, p1, p2 = await task.wait()
Expand All @@ -1284,9 +1284,9 @@ async def core_func(task, args):
[ret] = await canon_lower(opts, sink_ft, host_sink, task, [wsi])
assert(ret == 0)
mem[0:6] = b'\x01\x02\x03\x04\x05\x06'
[ret] = await canon_stream_write(task, wsi, 0, 6)
[ret] = await canon_stream_write(U8Type(), task, wsi, 0, 6)
assert(ret == 2)
[ret] = await canon_stream_write(task, wsi, 2, 6)
[ret] = await canon_stream_write(U8Type(), task, wsi, 2, 6)
assert(ret == definitions.BLOCKED)
dst.set_remain(4)
event, p1, p2 = await task.wait()
Expand Down Expand Up @@ -1320,15 +1320,15 @@ async def core_func1(task, args):
await task.wait_on(fut1)

mem1[0:4] = b'\x01\x02\x03\x04'
[ret] = await canon_stream_write(task, wsi, 0, 2)
[ret] = await canon_stream_write(U8Type(), task, wsi, 0, 2)
assert(ret == 2)
[ret] = await canon_stream_write(task, wsi, 2, 2)
[ret] = await canon_stream_write(U8Type(), task, wsi, 2, 2)
assert(ret == 2)

await task.wait_on(fut2)

mem1[0:8] = b'\x05\x06\x07\x08\x09\x0a\x0b\x0c'
[ret] = await canon_stream_write(task, wsi, 0, 8)
[ret] = await canon_stream_write(U8Type(), task, wsi, 0, 8)
assert(ret == definitions.BLOCKED)

fut3.set_result(None)
Expand Down Expand Up @@ -1360,7 +1360,7 @@ async def core_func2(task, args):
rsi = mem2[0]
assert(rsi == 1)

[ret] = await canon_stream_read(task, rsi, 0, 8)
[ret] = await canon_stream_read(U8Type(), task, rsi, 0, 8)
assert(ret == definitions.BLOCKED)

fut1.set_result(None)
Expand All @@ -1375,16 +1375,16 @@ async def core_func2(task, args):
await task.wait_on(fut3)

mem2[0:8] = bytes(8)
[ret] = await canon_stream_read(task, rsi, 0, 2)
[ret] = await canon_stream_read(U8Type(), task, rsi, 0, 2)
assert(ret == 2)
assert(mem2[0:6] == b'\x05\x06\x00\x00\x00\x00')
[ret] = await canon_stream_read(task, rsi, 2, 2)
[ret] = await canon_stream_read(U8Type(), task, rsi, 2, 2)
assert(ret == 2)
assert(mem2[0:6] == b'\x05\x06\x07\x08\x00\x00')

await task.wait_on(fut4)

[ret] = await canon_stream_read(task, rsi, 0, 2)
[ret] = await canon_stream_read(U8Type(), task, rsi, 0, 2)
assert(ret == definitions.CLOSED)
[] = await canon_waitable_drop(task, rsi)

Expand All @@ -1408,14 +1408,14 @@ async def test_borrow_stream():
async def core_func1(task, args):
[rsi] = args

[ret] = await canon_stream_read(task, rsi, 4, 2)
[ret] = await canon_stream_read(BorrowType(rt), task, rsi, 4, 2)
assert(ret == definitions.BLOCKED)

event, p1, p2 = await task.wait()
assert(event == EventCode.STREAM_READ)
assert(p1 == rsi)
assert(p2 == 2)
[ret] = await canon_stream_read(task, rsi, 0, 2)
[ret] = await canon_stream_read(BorrowType(rt), task, rsi, 0, 2)
assert(ret == definitions.CLOSED)

[] = await canon_waitable_drop(task, rsi)
Expand Down Expand Up @@ -1449,7 +1449,7 @@ async def core_func2(task, args):
mem2[0] = h1
mem2[4] = h2

[ret] = await canon_stream_write(task, wsi, 0, 2)
[ret] = await canon_stream_write(BorrowType(rt), task, wsi, 0, 2)
assert(ret == 2)
[] = await canon_waitable_drop(task, wsi)

Expand Down Expand Up @@ -1491,7 +1491,7 @@ async def core_func(task, args):
[ret] = await canon_lower(lower_opts, host_ft1, host_func1, task, [wsi])
assert(ret == 0)
mem[0:4] = b'\x0a\x0b\x0c\x0d'
[ret] = await canon_stream_write(task, wsi, 0, 4)
[ret] = await canon_stream_write(U8Type(), task, wsi, 0, 4)
assert(ret == definitions.BLOCKED)
host_sink.set_remain(2)
got = await host_sink.consume(2)
Expand All @@ -1504,7 +1504,7 @@ async def core_func(task, args):
[ret] = await canon_lower(lower_opts, host_ft1, host_func1, task, [wsi])
assert(ret == 0)
mem[0:4] = b'\x01\x02\x03\x04'
[ret] = await canon_stream_write(task, wsi, 0, 4)
[ret] = await canon_stream_write(U8Type(), task, wsi, 0, 4)
assert(ret == definitions.BLOCKED)
host_sink.set_remain(2)
got = await host_sink.consume(2)
Expand All @@ -1517,7 +1517,7 @@ async def core_func(task, args):
[ret] = await canon_lower(lower_opts, host_ft2, host_func2, task, [retp])
assert(ret == 0)
rsi = mem[retp]
[ret] = await canon_stream_read(task, rsi, 0, 4)
[ret] = await canon_stream_read(U8Type(), task, rsi, 0, 4)
assert(ret == definitions.BLOCKED)
[ret] = await canon_stream_cancel_read(True, task, rsi)
assert(ret == 0)
Expand All @@ -1527,7 +1527,7 @@ async def core_func(task, args):
[ret] = await canon_lower(lower_opts, host_ft2, host_func2, task, [retp])
assert(ret == 0)
rsi = mem[retp]
[ret] = await canon_stream_read(task, rsi, 0, 4)
[ret] = await canon_stream_read(U8Type(), task, rsi, 0, 4)
assert(ret == definitions.BLOCKED)
host_source.eager_cancel.clear()
[ret] = await canon_stream_cancel_read(False, task, rsi)
Expand Down Expand Up @@ -1605,12 +1605,12 @@ async def core_func(task, args):
rfi = mem[retp]

readp = 0
[ret] = await canon_future_read(task, rfi, readp)
[ret] = await canon_future_read(U8Type(), task, rfi, readp)
assert(ret == definitions.BLOCKED)

writep = 8
mem[writep] = 42
[ret] = await canon_future_write(task, wfi, writep)
[ret] = await canon_future_write(U8Type(), task, wfi, writep)
assert(ret == 1)

event,p1,p2 = await task.wait()
Expand All @@ -1635,12 +1635,12 @@ async def core_func(task, args):
rfi = mem[retp]

readp = 0
[ret] = await canon_future_read(task, rfi, readp)
[ret] = await canon_future_read(U8Type(), task, rfi, readp)
assert(ret == definitions.BLOCKED)

writep = 8
mem[writep] = 42
[ret] = await canon_future_write(task, wfi, writep)
[ret] = await canon_future_write(U8Type(), task, wfi, writep)
assert(ret == 1)

event,p1,p2 = await task.wait()
Expand Down

0 comments on commit 924f180

Please sign in to comment.