Skip to content

Commit 1f32890

Browse files
committed
Specify and test pipeTo rigorously
This commit introduces an actual spec for pipeTo, now that writable streams are nailed down. The algorithm explicitly notes that much of the actual data movement is unobservable due to locking, and as such just specifies some requirements (mostly around error and close propagation) for how the piping must happen. The reference implementation pipeTo is rewritten to more explicitly reflect this algorithm and its requirements. All pipeTo tests have been converted to web platform tests format, with cleanups as that was done. Some coverage was lost by not using the templating machinery, but some was gained by testing the options more exhaustively in various scenarios. One flow control test (the complicated one with the many different times at which things are tested) changed in expectation, in a positive way. We now explicitly prohibit reading while the writable stream is not ready, whereas the previous algorithm would read a chunk from the readable stream and keep it in "limbo" between the two streams. The new formulation propagates backpressure better; the readable stream's queue is not alleviated prematurely, so backpressure propagates more reliably.
1 parent e69528c commit 1f32890

24 files changed

+2281
-1868
lines changed

index.bs

Lines changed: 82 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -603,16 +603,72 @@ ReadableStream(<var>underlyingSource</var> = {}, { <var>size</var>, <var>highWat
603603
consumer from acquiring a reader.
604604
</div>
605605

606-
The <code>pipeTo</code> method is still in some flux. Its design depends on the design of writable streams, which
607-
<a href="#ws-not-ready-yet">are still undergoing spec churn</a>.
608-
609-
For now, the reference implementation and tests provide a guide to what this method is generally intended to do: <a
610-
href="https://github.com/whatwg/streams/blob/master/reference-implementation/lib/readable-stream.js">reference-implementation/lib/readable-stream.js</a>,
611-
look for the <code>pipeTo</code> method. In addition to changing as the writable stream design changes, one major
612-
aspect of <code>pipeTo</code> not captured by the reference implementation is that it will operate via unobservable
613-
abstract operation calls, instead of using the JavaScript-exposed readable and writable stream APIs. This will better
614-
allow optimization and specialization. See <a href="https://github.com/whatwg/streams/issues/407">#407</a> and <a
615-
href="https://github.com/whatwg/streams/issues/97">#97</a> for more information.
606+
<emu-alg>
607+
1. If ! IsReadableStream(*this*) is *false*, return a promise rejected with a *TypeError* exception.
608+
1. If ! IsWritableStream(_dest_) is *false*, return a promise rejected with a *TypeError* exception.
609+
1. Set _preventClose_ to ? ToBoolean(_preventClose_), set _preventAbort_ to ? ToBoolean(_preventAbort_), and set
610+
_preventCancel_ to ? ToBoolean(_preventCancel_).
611+
1. If ! IsReadableByteStreamController(*this*.[[readableStreamController]]) is true, let _reader_ be either ?
612+
AcquireReadableStreamBYOBReader(*this*) or ? AcquireReadableStreamDefaultReader(*this*), at the user agent's
613+
discretion.
614+
1. Otherwise, let _reader_ be ? AcquireReadableStreamDefaultReader(*this*).
615+
1. Let _writer_ be ? AcquireWritableStreamDefaultWriter(_dest_).
616+
1. Let _promise_ be a new promise.
617+
1. Let _shuttingDown_ be *false*.
618+
1. <a>In parallel</a>, using _reader_ and _writer_, read all <a>chunks</a> from *this* and write them to _dest_. Due
619+
to the locking provided by the reader and writer, the exact manner in which this happens is not observable to
620+
author code, and so there is flexibility in how this is done. The following constraints apply regardless of the
621+
exact algorithm used:
622+
* <strong>Public API must not be used:</strong> while reading or writing, or performing any of the operations
623+
below, the JavaScript-modifiable reader, writer, and stream APIs (i.e. methods on the appropriate prototypes)
624+
must not be used. Instead, the streams must be manipulated directly.
625+
* <strong>Backpressure must be enforced:</strong>
626+
* While WritableStreamDefaultWriterGetDesiredSize(_writer_) is ≤ *0* or is *null*, the user agent must not read
627+
from _reader_.
628+
* If _reader_ is a <a>BYOB reader</a>, WritableStreamDefaultWriterGetDesiredSize(_writer_) should be used to
629+
determine the size of the chunks read from _reader_.
630+
* Otherwise, WritableStreamDefaultWriterGetDesiredSize(_writer_) may be used to determine the flow rate
631+
heuristically, e.g. by delaying reads while it is judged to be "low" compared to the size of chunks that have
632+
been typically read.
633+
* <strong>Shutdown must stop all activity:</strong> if _shuttingDown_ becomes *true*, the user agent must not
634+
initiate further reads from _reader_ or writes to _writer_. (Ongoing reads and writes may finish.)
635+
* <strong>Errors must be propagated forward:</strong> if *this*.[[state]] is or becomes `"errored"`, then
636+
1. If _preventAbort_ is *false*, <a href="#rs-pipeTo-async-shutdown">perform async shutdown</a> given the action
637+
of performing ! WritableStreamAbort(_dest_, *this*.[[storedError]]), and given *this*.[[storedError]].
638+
1. Otherwise, <a href="#rs-pipeTo-shutdown">perform shutdown</a> with *this*.[[storedError]].
639+
* <strong>Errors must be propagated backward:</strong> if _dest_.[[state]] is or becomes `"errored"`, then
640+
1. If _preventCancel_ is *false*, <a href="#rs-pipeTo-async-shutdown">perform async shutdown</a> given the action
641+
of performing ! ReadableStreamCancel(*this*, _dest_.[[storedError]]), and given _dest_.[[storedError]].
642+
1. Otherwise, <a href="#rs-pipeTo-shutdown">perform shutdown</a> with _dest_.[[storedError]].
643+
* <strong>Closing must be propagated forward:</strong> if *this*.[[state]] is or becomes `"closed"`, then
644+
1. If _preventClose_ is *false*, <a href="#rs-pipeTo-async-shutdown">perform async shutdown</a> given the action
645+
of performing ! WritableStreamDefaultWriterCloseWithErrorPropagation(_writer_).
646+
1. Otherwise, <a href="#rs-pipeTo-shutdown">perform shutdown</a>.
647+
* <strong>Closing must be propagated backward:</strong> if _dest_.[[state]] is `"closing"` or `"closed"`, then
648+
1. Let _destClosed_ be a new *TypeError*.
649+
1. If _preventCancel_ is *false*, <a href="#rs-pipeTo-async-shutdown">Perform async shutdown</a> given the action
650+
of performing ! ReadableStreamCancel(*this*, _destClosed_), and given and _destClosed_.
651+
1. Otherwise, <a href="#rs-pipeTo-shutdown">perform shutdown</a> with _destClosed_.
652+
* <i id="rs-pipeTo-async-shutdown">Async shutdown</i>: if any of the above requirements ask to perform async
653+
shutdown, given an action _action_ and an optional error _originalError_, then:
654+
1. If _shuttingDown_ is *true*, abort these substeps.
655+
1. Set _shuttingDown_ to *true*.
656+
1. Wait until any ongoing write finishes (i.e. the corresponding promises settle).
657+
1. Let _p_ be the result of performing _action_.
658+
1. If _p_ fulfills,
659+
1. If _originalError_ was given, <a href="#rs-pipeTo-shutdown">perform shutdown</a> with _originalError_.
660+
1. Otherwise, <a href="#rs-pipeTo-shutdown">perform shutdown</a>.
661+
1. If _p_ rejects with reason _newError_, <a href="#rs-pipeTo-shutdown">perform shutdown</a> with _newError_.
662+
* <i id="rs-pipeTo-shutdown">Shutdown</i>: if any of the above requirements or steps ask to perform shutdown, given
663+
an optional error _error_, then:
664+
1. Set _shuttingDown_ to *true*.
665+
1. Wait until any ongoing write finishes (i.e. the corresponding promises settle).
666+
1. Perform ! WritableStreamDefaultWriterRelease(_writer_).
667+
1. Perform ! ReadableStreamReaderGenericRelease(_reader_).
668+
1. If _error_ was given, reject _promise_ with the given error.
669+
1. Otherwise, resolve _promise_ with *undefined*.
670+
1. Return _promise_.
671+
</emu-alg>
616672

617673
<h5 id="rs-tee" method for="ReadableStream">tee()</h5>
618674

@@ -3017,6 +3073,22 @@ nothrow>WritableStreamDefaultWriterClose ( <var>writer</var> )</h4>
30173073
1. Return _promise_.
30183074
</emu-alg>
30193075

3076+
<h4 id="writable-stream-default-writer-close-with-error-propagation" aoid="WritableStreamDefaultWriterCloseWithErrorPropagation"
3077+
nothrow>WritableStreamDefaultWriterCloseWithErrorPropagation ( <var>writer</var> )</h4>
3078+
3079+
<p class="note">This abstract operation helps implement the error propagation semantics of
3080+
{{ReadableStream/pipeTo()}}.</p>
3081+
3082+
<emu-alg>
3083+
1. Let _stream_ be _writer_.[[ownerWritableStream]].
3084+
1. Assert: _stream_ is not *undefined*.
3085+
1. Let _state_ be _stream_.[[state]].
3086+
1. If _state_ is `"closed"` or `"closing"`, return a new promise resolved with *undefined*.
3087+
1. If _state_ is `"errored"`, return a new promise rejected with _stream_.[[storedError]].
3088+
1. Assert: _state_ is `"writable"`.
3089+
1. Return ! WritableStreamDefaultWriterClose(_writer_).
3090+
</emu-alg>
3091+
30203092
<h4 id="writable-stream-default-writer-get-desired-size" aoid="WritableStreamDefaultWriterGetDesiredSize"
30213093
nothrow>WritableStreamDefaultWriterGetDesiredSize ( <var>writer</var> )</h4>
30223094

0 commit comments

Comments
 (0)