Skip to content

Commit 5c19448

Browse files
committed
Relax stream.{read,write} to allow lengths of 0
1 parent d43430d commit 5c19448

File tree

4 files changed

+78
-20
lines changed

4 files changed

+78
-20
lines changed

design/mvp/Async.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,11 @@ These built-ins can either return immediately if >0 elements were able to be
325325
written or read immediately (without blocking) or return a sentinel "blocked"
326326
value indicating that the read or write will execute concurrently. The readable
327327
and writable ends of streams and futures can then be [waited](#waiting) on to
328-
make progress.
328+
make progress. Notification of progress signals *completion* of a read or write
329+
(i.e., the bytes have already been copied into the buffer). Additionally,
330+
*readiness* (to perform a read or write in the future) can be queried and
331+
signalled by performing a `0`-length read or write (see the [Stream State]
332+
section in the Canonical ABI explainer for details).
329333

330334
The `T` element type of streams and futures is optional, such that `future` and
331335
`stream` can be written in WIT without a trailing `<T>`. In this case, the

design/mvp/CanonicalABI.md

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -363,8 +363,8 @@ class BufferGuestImpl(Buffer):
363363
length: int
364364

365365
def __init__(self, t, cx, ptr, length):
366-
trap_if(length == 0 or length > Buffer.MAX_LENGTH)
367-
if t:
366+
trap_if(length > Buffer.MAX_LENGTH)
367+
if t and length > 0:
368368
trap_if(ptr != align_to(ptr, alignment(t)))
369369
trap_if(ptr + length * elem_size(t) > len(cx.opts.memory))
370370
self.cx = cx
@@ -1178,14 +1178,34 @@ but in the opposite direction. Both are implemented by a single underlying
11781178
return 'blocked'
11791179
else:
11801180
ncopy = min(src.remain(), dst.remain())
1181-
assert(ncopy > 0)
1182-
dst.write(src.read(ncopy))
1183-
if self.pending_buffer.remain() > 0:
1184-
self.pending_on_partial_copy(self.reset_pending)
1181+
if ncopy > 0:
1182+
dst.write(src.read(ncopy))
1183+
if self.pending_buffer.remain() > 0:
1184+
self.pending_on_partial_copy(self.reset_pending)
1185+
else:
1186+
self.reset_and_notify_pending()
1187+
return 'done'
11851188
else:
1186-
self.reset_and_notify_pending()
1187-
return 'done'
1188-
```
1189+
if self.pending_buffer.remain() == 0:
1190+
self.reset_and_notify_pending()
1191+
if buffer.remain() == 0:
1192+
return 'done'
1193+
else:
1194+
self.pending_buffer = buffer
1195+
self.pending_on_partial_copy = on_partial_copy
1196+
self.pending_on_copy_done = on_copy_done
1197+
return 'blocked'
1198+
```
1199+
The meaning of a `read` or `write` when the length is `0` is that the caller
1200+
wants to know when the other side is "ready". When a non-`0`-length `read` or
1201+
`write` rendezvous with a `0`-length `write` or `read`, only the `0`-length
1202+
operation completes, keeping the non-`0`-length pending and ready for a future
1203+
rendezvous. In a rendezvous where *both* the `read` and `write` are `0`-length,
1204+
both operations complete. Thus, "readiness" does not guarantee "the next
1205+
operation will not block" and performing a `0`-length `read` or `write` doesn't
1206+
just *query* for readiness but also *signals* readiness to the other side.
1207+
Consequently, components should always follow a successful `0`-length `read` or
1208+
`write` with a non-`0`-length `read` or `write`.
11891209

11901210
Given the above, we can define the `{Readable,Writable}StreamEnd` classes that
11911211
are actually stored in the `waitables` table. The classes are almost entirely

design/mvp/canonical-abi/definitions.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -311,8 +311,8 @@ class BufferGuestImpl(Buffer):
311311
length: int
312312

313313
def __init__(self, t, cx, ptr, length):
314-
trap_if(length == 0 or length > Buffer.MAX_LENGTH)
315-
if t:
314+
trap_if(length > Buffer.MAX_LENGTH)
315+
if t and length > 0:
316316
trap_if(ptr != align_to(ptr, alignment(t)))
317317
trap_if(ptr + length * elem_size(t) > len(cx.opts.memory))
318318
self.cx = cx
@@ -702,13 +702,23 @@ def copy(self, buffer, on_partial_copy, on_copy_done, src, dst):
702702
return 'blocked'
703703
else:
704704
ncopy = min(src.remain(), dst.remain())
705-
assert(ncopy > 0)
706-
dst.write(src.read(ncopy))
707-
if self.pending_buffer.remain() > 0:
708-
self.pending_on_partial_copy(self.reset_pending)
705+
if ncopy > 0:
706+
dst.write(src.read(ncopy))
707+
if self.pending_buffer.remain() > 0:
708+
self.pending_on_partial_copy(self.reset_pending)
709+
else:
710+
self.reset_and_notify_pending()
711+
return 'done'
709712
else:
710-
self.reset_and_notify_pending()
711-
return 'done'
713+
if self.pending_buffer.remain() == 0:
714+
self.reset_and_notify_pending()
715+
if buffer.remain() == 0:
716+
return 'done'
717+
else:
718+
self.pending_buffer = buffer
719+
self.pending_on_partial_copy = on_partial_copy
720+
self.pending_on_copy_done = on_copy_done
721+
return 'blocked'
712722

713723
class StreamEnd(Waitable):
714724
stream: ReadableStream

design/mvp/canonical-abi/run_tests.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1457,8 +1457,19 @@ async def core_func1(task, args):
14571457
assert(mem1[retp+0] == wsi)
14581458
assert(mem1[retp+4] == 4)
14591459

1460+
[ret] = await canon_stream_write(U8Type(), opts1, task, wsi, 12345, 0)
1461+
assert(ret == definitions.BLOCKED)
1462+
14601463
fut4.set_result(None)
14611464

1465+
[event] = await canon_waitable_set_wait(False, mem1, task, seti, retp)
1466+
assert(event == EventCode.STREAM_WRITE)
1467+
assert(mem1[retp+0] == wsi)
1468+
assert(mem1[retp+4] == 0)
1469+
1470+
[ret] = await canon_stream_write(U8Type(), opts1, task, wsi, 12345, 0)
1471+
assert(ret == 0)
1472+
14621473
[errctxi] = await canon_error_context_new(opts1, task, 0, 0)
14631474
[] = await canon_stream_close_writable(U8Type(), task, wsi, errctxi)
14641475
[] = await canon_waitable_set_drop(task, seti)
@@ -1498,6 +1509,9 @@ async def core_func2(task, args):
14981509
fut2.set_result(None)
14991510
await task.on_block(fut3)
15001511

1512+
[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 12345, 0)
1513+
assert(ret == 0)
1514+
15011515
mem2[0:8] = bytes(8)
15021516
[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 2)
15031517
assert(ret == 2)
@@ -1508,9 +1522,19 @@ async def core_func2(task, args):
15081522

15091523
await task.on_block(fut4)
15101524

1511-
[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 2)
1525+
[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 12345, 0)
1526+
assert(ret == 0)
1527+
1528+
[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 12345, 0)
1529+
assert(ret == definitions.BLOCKED)
1530+
1531+
[event] = await canon_waitable_set_wait(False, mem2, task, seti, retp)
1532+
assert(event == EventCode.STREAM_READ)
1533+
assert(mem2[retp+0] == rsi)
1534+
p2 = int.from_bytes(mem2[retp+4 : retp+8], 'little', signed=False)
15121535
errctxi = 1
1513-
assert(ret == (definitions.CLOSED | errctxi))
1536+
assert(p2 == (definitions.CLOSED | errctxi))
1537+
15141538
[] = await canon_stream_close_readable(U8Type(), task, rsi)
15151539
[] = await canon_waitable_set_drop(task, seti)
15161540
[] = await canon_error_context_debug_message(opts2, task, errctxi, 0)

0 commit comments

Comments
 (0)