Skip to content

Rigorously specify and test pipeTo #512

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Oct 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 111 additions & 10 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -596,18 +596,103 @@ ReadableStream(<var>underlyingSource</var> = {}, { <var>size</var>, <var>highWat

Piping a stream will <a lt="locked to a reader">lock</a> it for the duration of the pipe, preventing any other
consumer from acquiring a reader.
</div>

The <code>pipeTo</code> method is still in some flux. Its design depends on the design of writable streams, which
<a href="#ws-not-ready-yet">are still undergoing spec churn</a>.
Errors and closures of the source and destination streams propagate as follows:

<ul>
<li><p>An error in the source <a>readable stream</a> will <a lt="abort a writable stream">abort</a> the destination
<a>writable stream</a>, unless <code>preventAbort</code> is truthy. The returned promise will be rejected with the
source's error, or with any error that occurs during aborting the destination.</p></li>

<li><p>An error in the destination <a>writable stream</a> will <a lt="cancel a readable stream">cancel</a> the
source <a>readable stream</a>, unless <code>preventCancel</code> is truthy. The returned promise will be rejected
with the destination's error, or with any error that occurs during canceling the source.</p></li>

<li><p>When the source <a>readable stream</a> closes, the destination <a>writable stream</a> will be closed, unless
<code>preventClose</code> is true. The returned promise will be fulfilled once this process completes, unless an
error is encountered while closing the destination, in which case it will be rejected with that error.</p></li>

<li><p>If the destination <a>writable stream</a> starts out closed or closing, the source <a>readable stream</a>
will be <a lt="cancel a readable stream">canceled</a>, unless <code>preventCancel</code> is true. The returned
promise will be rejected with an error indicating piping to a closed stream failed, or with any error that occurs
during canceling the source.</p></li>
</ul>
</div>

For now, the reference implementation and tests provide a guide to what this method is generally intended to do: <a
href="https://github.com/whatwg/streams/blob/master/reference-implementation/lib/readable-stream.js">reference-implementation/lib/readable-stream.js</a>,
look for the <code>pipeTo</code> method. In addition to changing as the writable stream design changes, one major
aspect of <code>pipeTo</code> not captured by the reference implementation is that it will operate via unobservable
abstract operation calls, instead of using the JavaScript-exposed readable and writable stream APIs. This will better
allow optimization and specialization. See <a href="https://github.com/whatwg/streams/issues/407">#407</a> and <a
href="https://github.com/whatwg/streams/issues/97">#97</a> for more information.
<emu-alg>
1. If ! IsReadableStream(*this*) is *false*, return a promise rejected with a *TypeError* exception.
1. If ! IsWritableStream(_dest_) is *false*, return a promise rejected with a *TypeError* exception.
1. Set _preventClose_ to ! ToBoolean(_preventClose_), set _preventAbort_ to ! ToBoolean(_preventAbort_), and set
_preventCancel_ to ! ToBoolean(_preventCancel_).
1. If ! IsReadableStreamLocked(*this*) is *true*, return a promise rejected with a *TypeError* exception.
1. If ! IsWritableStreamLocked(_dest_) is *true*, return a promise rejected with a *TypeError* exception.
1. If ! IsReadableByteStreamController(*this*.[[readableStreamController]]) is true, let _reader_ be either !
AcquireReadableStreamBYOBReader(*this*) or ! AcquireReadableStreamDefaultReader(*this*), at the user agent's
discretion.
1. Otherwise, let _reader_ be ! AcquireReadableStreamDefaultReader(*this*).
1. Let _writer_ be ! AcquireWritableStreamDefaultWriter(_dest_).
1. Let _shuttingDown_ be *false*.
1. Let _promise_ be <a>a new promise</a>.
1. <a>In parallel</a>, using _reader_ and _writer_, read all <a>chunks</a> from *this* and write them to _dest_. Due
to the locking provided by the reader and writer, the exact manner in which this happens is not observable to
author code, and so there is flexibility in how this is done. The following constraints apply regardless of the
exact algorithm used:
* <strong>Public API must not be used:</strong> while reading or writing, or performing any of the operations
below, the JavaScript-modifiable reader, writer, and stream APIs (i.e. methods on the appropriate prototypes)
must not be used. Instead, the streams must be manipulated directly.
* <strong>Backpressure must be enforced:</strong>
* While WritableStreamDefaultWriterGetDesiredSize(_writer_) is ≤ *0* or is *null*, the user agent must not read
from _reader_.
* If _reader_ is a <a>BYOB reader</a>, WritableStreamDefaultWriterGetDesiredSize(_writer_) should be used to
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we put some note here why we have the if-otherwise but recommending the same method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't quite understand. What were you thinking of adding?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh, never mind. Sorry.

determine the size of the chunks read from _reader_.
* Otherwise, WritableStreamDefaultWriterGetDesiredSize(_writer_) may be used to determine the flow rate
heuristically, e.g. by delaying reads while it is judged to be "low" compared to the size of chunks that have
been typically read.
* <strong>Shutdown must stop all activity:</strong> if _shuttingDown_ becomes *true*, the user agent must not
initiate further reads from _reader_ or writes to _writer_. (Ongoing reads and writes may finish.) In particular,
the user agent must check the below conditions on *this*.[[state]] and _dest_.[[state]] before performing any
reads or writes, since they might lead to immediate shutdown.
* <strong>Errors must be propagated forward:</strong> if *this*.[[state]] is or becomes `"errored"`, then
1. If _preventAbort_ is *false*, <a href="#rs-pipeTo-shutdown-with-action">shutdown with an action</a> of !
WritableStreamAbort(_dest_, *this*.[[storedError]]) and with *this*.[[storedError]].
1. Otherwise, <a href="#rs-pipeTo-shutdown">shutdown</a> with *this*.[[storedError]].
* <strong>Errors must be propagated backward:</strong> if _dest_.[[state]] is or becomes `"errored"`, then
1. If _preventCancel_ is *false*, <a href="#rs-pipeTo-shutdown-with-action">shutdown with an action</a> of !
ReadableStreamCancel(*this*, _dest_.[[storedError]]) and with _dest_.[[storedError]].
1. Otherwise, <a href="#rs-pipeTo-shutdown">shutdown</a> with _dest_.[[storedError]].
* <strong>Closing must be propagated forward:</strong> if *this*.[[state]] is or becomes `"closed"`, then
1. If _preventClose_ is *false*, <a href="#rs-pipeTo-shutdown-with-action">shutdown with an action</a> of !
WritableStreamDefaultWriterCloseWithErrorPropagation(_writer_).
1. Otherwise, <a href="#rs-pipeTo-shutdown">shutdown</a>.
* <strong>Closing must be propagated backward:</strong> if _dest_.[[state]] is `"closing"` or `"closed"`, then
1. Let _destClosed_ be a new *TypeError*.
1. If _preventCancel_ is *false*, <a href="#rs-pipeTo-shutdown-with-action">shutdown with an action</a> of !
ReadableStreamCancel(*this*, _destClosed_) and with _destClosed_.
1. Otherwise, <a href="#rs-pipeTo-shutdown">shutdown</a> with _destClosed_.
* <i id="rs-pipeTo-shutdown-with-action">Shutdown with an action</i>: if any of the above requirements ask to
shutdown with an action _action_, optionally with an error _originalError_, then:
1. If _shuttingDown_ is *true*, abort these substeps.
1. Set _shuttingDown_ to *true*.
1. Wait until any ongoing write finishes (i.e. the corresponding promises settle).
1. Let _p_ be the result of performing _action_.
1. <a>Upon fulfillment</a> of _p_, <a href="#rs-pipeTo-finalize">finalize</a>, passing along _originalError_ if
it was given.
1. <a>Upon rejection</a> of _p_ with reason _newError_, <a href="#rs-pipeTo-finalize">finalize</a> with
_newError_.
* <i id="rs-pipeTo-shutdown">Shutdown</i>: if any of the above requirements or steps ask to shutdown, optionally
with an error _error_, then:
1. If _shuttingDown_ is *true*, abort these substeps.
1. Set _shuttingDown_ to *true*.
1. Wait until any ongoing write finishes (i.e. the corresponding promises settle).
1. <a href="#rs-pipeTo-finalize">Finalize</a>, passing along _error_ if it was given.
* <i id="rs-pipeTo-finalize">Finalize</i>: both forms of shutdown will eventually ask to finalize, optionally with
an error _error_, which means to perform the following steps:
1. Perform ! WritableStreamDefaultWriterRelease(_writer_).
1. Perform ! ReadableStreamReaderGenericRelease(_reader_).
1. If _error_ was given, <a>reject</a> _promise_ with _error_.
1. Otherwise, <a>resolve</a> _promise_ with *undefined*.
1. Return _promise_.
</emu-alg>

<h5 id="rs-tee" method for="ReadableStream">tee()</h5>

Expand Down Expand Up @@ -2995,6 +3080,22 @@ nothrow>WritableStreamDefaultWriterClose ( <var>writer</var> )</h4>
1. Return _promise_.
</emu-alg>

<h4 id="writable-stream-default-writer-close-with-error-propagation" aoid="WritableStreamDefaultWriterCloseWithErrorPropagation"
nothrow>WritableStreamDefaultWriterCloseWithErrorPropagation ( <var>writer</var> )</h4>

<p class="note">This abstract operation helps implement the error propagation semantics of
{{ReadableStream/pipeTo()}}.</p>

<emu-alg>
1. Let _stream_ be _writer_.[[ownerWritableStream]].
1. Assert: _stream_ is not *undefined*.
1. Let _state_ be _stream_.[[state]].
1. If _state_ is `"closing"` or `"closed"`, return <a>a promise resolved with</a> *undefined*.
1. If _state_ is `"errored"`, return <a>a promise rejected with</a> _stream_.[[storedError]].
1. Assert: _state_ is `"writable"`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add this assert to the reference implementation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

1. Return ! WritableStreamDefaultWriterClose(_writer_).
</emu-alg>

<h4 id="writable-stream-default-writer-get-desired-size" aoid="WritableStreamDefaultWriterGetDesiredSize"
nothrow>WritableStreamDefaultWriterGetDesiredSize ( <var>writer</var> )</h4>

Expand Down
Loading