Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement teeing, plus a bunch of stage-setting tweaks #311

Merged
merged 7 commits into from
Apr 6, 2015
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Tweak spec mechanics of WritableStream Error Functions
Instead of creating a closure in the constructor, storing it on the stream, and calling this@[[error]] all the time, we can instead abstract out an ErrorWritableStream(stream, e) abstract operation and call that when appropriate. Then, the WritableStream error functions just delegate to this abstract operation.

The big upside of this is that, if the error function ends up not being used by the underlying sink (e.g. the underlying sink relies on returning rejected promises instead), we can immediately garbage-collect it, instead of carrying it around for the lifetime of the stream.

While doing so, make the editorial move to ES-spec-style closures, with internal slots, instead of using the "closing over" technique. It turns out that when you try to extend the "closing over" language to more complicated situations (as we will do for teeing), it falls over, becoming very confusing. The ES-spec version of closures is obtuse, but works.

Finally, we now put the definitions of closures underneath the place that creates them (in this case the constructor), instead of creating a whole new CreateWritableStreamErrorFunction abstract operation, defined some distance from the relevant function. This will serve us well for tee and pipe.
  • Loading branch information
domenic committed Apr 6, 2015
commit cde727e7c09ca3a3277e2078895731132043ca95
51 changes: 24 additions & 27 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -1025,11 +1025,6 @@ Instances of <code>WritableStream</code> are created with the internal slots des
<td>A promise that becomes fulfilled when the stream becomes <code>"closed"</code>; returned by the
<code>closed</code> getter
</tr>
<tr>
<td>\[[error]]
<td>A <a>Writable Stream Error Function</a> created with the ability to move this stream to an
<code>"errored"</code> state
</tr>
<tr>
<td>\[[queue]]
<td>A List representing the stream's internal queue of pending writes
Expand Down Expand Up @@ -1117,15 +1112,26 @@ Instances of <code>WritableStream</code> are created with the internal slots des
1. Set *this*@[[queue]] to a new empty List.
1. Set *this*@[[state]] to "writable".
1. Set *this*@[[started]] and *this*@[[writing]] to *false*.
1. Set *this*@[[error]] to CreateWritableStreamErrorFunction(*this*).
1. Call-with-rethrow SyncWritableStreamStateWithQueue(*this*).
1. Let _startResult_ be InvokeOrNoop(_underlyingSink_, "start", «‍*this*@[[error]]»).
1. Let _error_ be a new <a><code>WritableStream</code> error function</a>.
1. Set _error_@[[stream]] to *this*.
1. Let _startResult_ be InvokeOrNoop(_underlyingSink_, "start", «_error_»).
1. ReturnIfAbrupt(_startResult_).
1. Set *this*@[[startedPromise]] to the result of resolving _startResult_ as a promise.
1. Upon fulfillment,
1. Set *this*@[[started]] to *true*.
1. Set *this*@[[startedPromise]] to *undefined*.
1. Upon rejection with reason _r_, call-with-rethrow Call(*this*@[[error]], *undefined*, «‍_r_»).
1. Upon rejection with reason _r_, return ErrorWritableStream(*this*, _r_).
</pre>

A <dfn><code>WritableStream</code> error function</dfn> is an anonymous built-in function that is used to allow
<a>underlying sinks</a> to error their associated writable stream. Each <code>WritableStream</code> error function has
a \[[stream]] internal slot. When a <code>WritableStream</code> error function <var>F</var> is called with argument
<var>e</var>, it performs the following steps:

<pre is="emu-alg">
1. Let _stream_ be _F_@[[stream]].
1. Return ErrorWritableStream(_stream_, _e_).
</pre>

<h4 id="ws-prototype">Properties of the <code>WritableStream</code> Prototype</h4>
Expand Down Expand Up @@ -1201,7 +1207,7 @@ Instances of <code>WritableStream</code> are created with the internal slots des
1. If IsWritableStream(*this*) is *false*, return a promise rejected with a *TypeError* exception.
1. If *this*@[[state]] is "closed", return a new promise resolved with *undefined*.
1. If *this*@[[state]] is "errored", return a new promise rejected with *this*@[[storedError]].
1. Call-with-rethrow Call(*this*@[[error]], *undefined*, «‍reason»).
1. Call-with-rethrow ErrorWritableStream(*this*, _reason_).
1. Let _sinkAbortPromise_ be PromiseInvokeOrFallbackOrNoop(*this*@[[underlyingSink]], "abort", «_reason_», "close", «»).
1. Return the result of transforming _sinkAbortPromise_ by a fulfillment handler that returns *undefined*.
</pre>
Expand Down Expand Up @@ -1247,24 +1253,24 @@ Instances of <code>WritableStream</code> are created with the internal slots des
1. Let _chunkSize_ be *1*.
1. Let _strategy_ be Get(*this*@[[underlyingSink]], "strategy").
1. If _strategy_ is an abrupt completion,
1. Call-with-rethrow Call(*this*@[[error]], *undefined*, «‍_strategy_.[[value]]»).
1. Call-with-rethrow ErrorWritableStream(*this*, _strategy_.[[value]]).
1. Return a new promise rejected with _strategy_.[[value]].
1. Set _strategy_ to _strategy_.[[value]].
1. If _strategy_ is not *undefined*, then
1. Set _chunkSize_ to Invoke(_strategy_, "size", «‍_chunk_»).
1. If _chunkSize_ is an abrupt completion,
1. Call-with-rethrow Call(*this*@[[error]], *undefined*, «‍_chunkSize_.[[value]]»).
1. Call-with-rethrow ErrorWritableStream(*this*, _chunkSize_.[[value]]).
1. Return a new promise rejected with _chunkSize_.[[value]].
1. Set _chunkSize_ to _chunkSize_.[[value]].
1. Let _promise_ be a new promise.
1. Let _writeRecord_ be Record{[[promise]]: _promise_, [[chunk]]: _chunk_}.
1. Let _enqueueResult_ be EnqueueValueWithSize(*this*@[[queue]], _writeRecord_, _chunkSize_).
1. If _enqueueResult_ is an abrupt completion,
1. Call-with-rethrow Call(*this*@[[error]], *undefined*, «‍_enqueueResult_.[[value]]»).
1. Call-with-rethrow ErrorWritableStream(*this*, _enqueueResult_.[[value]]).
1. Return a new promise rejected with _enqueueResult_.[[value]].
1. Let _syncResult_ be SyncWritableStreamStateWithQueue(*this*).
1. If _syncResult_ is an abrupt completion,
1. Call-with-rethrow Call(*this*@[[error]], *undefined*, «‍_syncResult_.[[value]]»).
1. Call-with-rethrow ErrorWritableStream(*this*, _syncResult_.[[value]]).
1. Return _promise_.
1. Call-with-rethrow CallOrScheduleWritableStreamAdvanceQueue(*this*).
1. Return _promise_.
Expand All @@ -1291,18 +1297,11 @@ Instances of <code>WritableStream</code> are created with the internal slots des
1. Assert: _stream_@[[state]] is "closing".
1. Resolve _stream_@[[closedPromise]] with *undefined*.
1. Set _stream_@[[state]] to "closed".
1. Upon rejection with reason _r_, call-with-rethrow Call(_stream_@[[error]], *undefined*, «‍_r_»).
1. Upon rejection with reason _r_, return ErrorWritableStream(_stream_, _r_).
1. Return *undefined*.
</pre>

<h4 id="create-writable-stream-error-function" aoid="CreateWritableStreamErrorFunction">CreateWritableStreamErrorFunction ( stream )</h4>

<pre is="emu-alg">
1. Return a new <a>Writable Stream Error Function</a> closing over _stream_.
</pre>

A <dfn>Writable Stream Error Function</dfn> is a built-in anonymous function of one argument <var>e</var>, closing over
a variable <var>stream</var>, that performs the following steps:
<h4 id="error-writable-stream" aoid="ErrorWritableStream">ErrorWritableStream ( stream, e )</h4>

<pre is="emu-alg">
1. If _stream_@[[state]] is "closed" or "errored", return *undefined*.
Expand All @@ -1313,6 +1312,7 @@ a variable <var>stream</var>, that performs the following steps:
1. If _stream_@[[state]] is "waiting", resolve _stream_@[[readyPromise]] with *undefined*.
1. Reject _stream_@[[closedPromise]] with _e_.
1. Set _stream_@[[state]] to "errored".
1. Return *undefined*.
</pre>

<h4 id="is-writable-stream" aoid="IsWritableStream">IsWritableStream ( x )</h4>
Expand Down Expand Up @@ -1363,9 +1363,9 @@ a variable <var>stream</var>, that performs the following steps:
1. Resolve _writeRecord_.[[promise]] with *undefined*.
1. DequeueValue(_stream_@[[queue]]).
1. Let _syncResult_ be SyncWritableStreamStateWithQueue(_stream_).
1. If _syncResult_ is an abrupt completion, then call-with-rethrow Call(_stream_@[[error]], *undefined*, «‍_syncResult_.[[value]]»).
1. If _syncResult_ is an abrupt completion, then return ErrorWritableStream(_stream_, ‍_syncResult_.[[value]]).
1. Otherwise, return WritableStreamAdvanceQueue(_stream_).
1. Upon rejection of _writeResult_ with reason _r_, call-with-rethrow Call(_stream_@[[error]], *undefined*, «‍_r_»).
1. Upon rejection of _writeResult_ with reason _r_, return ErrorWritableStream(_stream_, _r_).
</pre>

<h2 id="ts">Transform Streams</h2>
Expand Down Expand Up @@ -2140,9 +2140,6 @@ itself will evolve in these ways.
ReturnIfAbrupt(<var>opResult</var>)."
<li> We use <a href="https://w3ctag.github.io/promises-guide/#shorthand-phrases">the shorthand phrases from the W3C
TAG promises guide</a> to operate on promises at a higher level than the ECMAScript spec does.
<li> We introduce the notion of creating a function that "closes over" a given variable. This is meant to work the
same as how the ECMAScript spec gives such functions internal slots which get filled in upon creation and then
have their values pulled out of during execution, but require less formal contortions.
</ul>

<h2 id="acks" class="no-num">Acknowledgments</h2>
Expand Down
66 changes: 33 additions & 33 deletions reference-implementation/lib/writable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@ export default class WritableStream {
this._started = false;
this._writing = false;

this._error = CreateWritableStreamErrorFunction(this);
const error = closure_WritableStreamErrorFunction();
error._stream = this;

SyncWritableStreamStateWithQueue(this);

const startResult = InvokeOrNoop(underlyingSink, 'start', [this._error]);
const startResult = InvokeOrNoop(underlyingSink, 'start', [error]);
this._startedPromise = Promise.resolve(startResult);
this._startedPromise.then(() => {
this._started = true;
this._startedPromise = undefined;
});
this._startedPromise.catch(r => this._error(r));
this._startedPromise.catch(r => ErrorWritableStream(this, r));
}

get closed() {
Expand Down Expand Up @@ -61,7 +62,7 @@ export default class WritableStream {
return Promise.reject(this._storedError);
}

this._error(reason);
ErrorWritableStream(this, reason);
const sinkAbortPromise = PromiseInvokeOrFallbackOrNoop(this._underlyingSink, 'abort', [reason], 'close', []);
return sinkAbortPromise.then(() => undefined);
}
Expand Down Expand Up @@ -122,15 +123,15 @@ export default class WritableStream {
try {
strategy = this._underlyingSink.strategy;
} catch (strategyE) {
this._error(strategyE);
ErrorWritableStream(this, strategyE);
return Promise.reject(strategyE);
}

if (strategy !== undefined) {
try {
chunkSize = strategy.size(chunk);
} catch (chunkSizeE) {
this._error(chunkSizeE);
ErrorWritableStream(this, chunkSizeE);
return Promise.reject(chunkSizeE);
}
}
Expand All @@ -145,14 +146,14 @@ export default class WritableStream {
try {
EnqueueValueWithSize(this._queue, writeRecord, chunkSize);
} catch (enqueueResultE) {
this._error(enqueueResultE);
ErrorWritableStream(this, enqueueResultE);
return Promise.reject(enqueueResultE);
}

try {
SyncWritableStreamStateWithQueue(this);
} catch (syncResultE) {
this._error(syncResultE);
ErrorWritableStream(this, syncResultE);
return promise;
}

Expand All @@ -161,6 +162,12 @@ export default class WritableStream {
}
}

function closure_WritableStreamErrorFunction() {
const f = e => ErrorWritableStream(f._stream, e);
return f;
}


function CallOrScheduleWritableStreamAdvanceQueue(stream) {
if (stream._started === false) {
stream._startedPromise.then(() => {
Expand Down Expand Up @@ -189,33 +196,29 @@ function CloseWritableStream(stream) {
stream._closedPromise_resolve(undefined);
stream._state = 'closed';
},
r => {
stream._error(r);
}
r => ErrorWritableStream(stream, r)
);
}

function CreateWritableStreamErrorFunction(stream) {
return e => {
if (stream._state === 'closed' || stream._state === 'errored') {
return undefined;
}
function ErrorWritableStream(stream, e) {
if (stream._state === 'closed' || stream._state === 'errored') {
return undefined;
}

while (stream._queue.length > 0) {
const writeRecord = DequeueValue(stream._queue);
if (writeRecord !== 'close') {
writeRecord._reject(e);
}
while (stream._queue.length > 0) {
const writeRecord = DequeueValue(stream._queue);
if (writeRecord !== 'close') {
writeRecord._reject(e);
}
}

stream._storedError = e;
stream._storedError = e;

if (stream._state === 'waiting') {
stream._readyPromise_resolve(undefined);
}
stream._closedPromise_reject(e);
stream._state = 'errored';
};
if (stream._state === 'waiting') {
stream._readyPromise_resolve(undefined);
}
stream._closedPromise_reject(e);
stream._state = 'errored';
}

export function IsWritableStream(x) {
Expand Down Expand Up @@ -290,14 +293,11 @@ function WritableStreamAdvanceQueue(stream) {
try {
SyncWritableStreamStateWithQueue(stream);
} catch (syncResultE) {
stream._error(syncResultE);
return;
return ErrorWritableStream(stream, syncResultE);
}
return WritableStreamAdvanceQueue(stream);
},
r => {
stream._error(r);
}
r => ErrorWritableStream(stream, r)
)
.catch(e => process.nextTick(() => { throw e; })); // to catch assertion failures
}
Expand Down