Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release reader immediately when shutting down a pipe #1208

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -2175,7 +2175,7 @@ The following abstract operations operate on {{ReadableStream}} instances at a h
|source|.[=ReadableStream/[[storedError]]=].
1. Otherwise, [=shutdown=] with |source|.[=ReadableStream/[[storedError]]=].
1. <strong>Errors must be propagated backward:</strong> if |dest|.[=WritableStream/[[state]]=]
is or becomes "`errored`", then
is or becomes "`erroring`" or "`errored`", then
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's useful to keep the pipe going when dest has already become "erroring"? Any new writes will just error immediately, as per step 9 of WritableStreamDefaultWriterWrite.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

1. If |preventCancel| is false, [=shutdown with an action=] of !
[$ReadableStreamCancel$](|source|, |dest|.[=WritableStream/[[storedError]]=]) and with
|dest|.[=WritableStream/[[storedError]]=].
Expand All @@ -2198,6 +2198,17 @@ The following abstract operations operate on {{ReadableStream}} instances at a h
|originalError|, then:
1. If |shuttingDown| is true, abort these substeps.
1. Set |shuttingDown| to true.
1. If |reader| [=implements=] {{ReadableStreamBYOBReader}}, perform
! [$ReadableStreamBYOBReaderRelease$](|reader|).
1. Otherwise, perform ! [$ReadableStreamDefaultReaderRelease$](|reader|).
MattiasBuelens marked this conversation as resolved.
Show resolved Hide resolved
1. Set |reader| to ! [$AcquireReadableStreamDefaultReader$](|source|).
<p class="note">The initial reader is released to ensure that any pending read requests
are immediately aborted, and no more chunks are pulled from |source|. A new reader is
acquired in order to keep |source| locked until the shutdown is [=finalized=], for example
to [=cancel a readable stream|cancel=] |source| if necessary.
This exchange of readers is not observable to author code and the user agent is free to
implement this differently, for example by keeping the same reader and internally aborting
its pending read requests.
1. If |dest|.[=WritableStream/[[state]]=] is "`writable`" and !
[$WritableStreamCloseQueuedOrInFlight$](|dest|) is false,
1. If any [=chunks=] have been read but not yet written, write them to |dest|.
Expand All @@ -2210,6 +2221,10 @@ The following abstract operations operate on {{ReadableStream}} instances at a h
ask to shutdown, optionally with an error |error|, then:
1. If |shuttingDown| is true, abort these substeps.
1. Set |shuttingDown| to true.
1. If |reader| [=implements=] {{ReadableStreamBYOBReader}}, perform
MattiasBuelens marked this conversation as resolved.
Show resolved Hide resolved
! [$ReadableStreamBYOBReaderRelease$](|reader|).
1. Otherwise, perform ! [$ReadableStreamDefaultReaderRelease$](|reader|).
1. Set |reader| to ! [$AcquireReadableStreamDefaultReader$](|source|).
1. If |dest|.[=WritableStream/[[state]]=] is "`writable`" and !
[$WritableStreamCloseQueuedOrInFlight$](|dest|) is false,
1. If any [=chunks=] have been read but not yet written, write them to |dest|.
Expand All @@ -2218,10 +2233,9 @@ The following abstract operations operate on {{ReadableStream}} instances at a h
1. [=Finalize=], passing along |error| if it was given.
* <dfn id="rs-pipeTo-finalize"><i>Finalize</i></dfn>: both forms of shutdown will eventually ask
to finalize, optionally with an error |error|, which means to perform the following steps:
1. Assert: |reader| [=implements=] {{ReadableStreamDefaultReader}}.
1. Perform ! [$WritableStreamDefaultWriterRelease$](|writer|).
1. If |reader| [=implements=] {{ReadableStreamBYOBReader}}, perform
! [$ReadableStreamBYOBReaderRelease$](|reader|).
1. Otherwise, perform ! [$ReadableStreamDefaultReaderRelease$](|reader|).
1. Perform ! [$ReadableStreamDefaultReaderRelease$](|reader|).
1. If |signal| is not undefined, [=AbortSignal/remove=] |abortAlgorithm| from |signal|.
1. If |error| was given, [=reject=] |promise| with |error|.
1. Otherwise, [=resolve=] |promise| with undefined.
Expand Down
91 changes: 66 additions & 25 deletions reference-implementation/lib/abstract-ops/readable-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ const { CloneAsUint8Array, IsNonNegativeNumber } = require('./miscellaneous.js')
const { EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js');
const { AcquireWritableStreamDefaultWriter, IsWritableStreamLocked, WritableStreamAbort,
WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterRelease,
WritableStreamDefaultWriterWrite, WritableStreamCloseQueuedOrInFlight } = require('./writable-streams.js');
WritableStreamDefaultWriterWrite, WritableStreamCloseQueuedOrInFlight, writerAddStateChangeListener } =
require('./writable-streams.js');
const { CancelSteps, PullSteps, ReleaseSteps } = require('./internal-methods.js');

const ReadableByteStreamController = require('../../generated/ReadableByteStreamController.js');
Expand Down Expand Up @@ -134,7 +135,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
assert(IsReadableStreamLocked(source) === false);
assert(IsWritableStreamLocked(dest) === false);

const reader = AcquireReadableStreamDefaultReader(source);
let reader = AcquireReadableStreamDefaultReader(source);
const writer = AcquireWritableStreamDefaultWriter(dest);

source._disturbed = true;
Expand Down Expand Up @@ -200,6 +201,12 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
}

return transformPromiseWith(writer._readyPromise, () => {
if (shuttingDown === true) {
return promiseResolvedWith(true);
}
if (dest._state !== 'writable' || WritableStreamCloseQueuedOrInFlight(dest) === true) {
return promiseResolvedWith(true);
}
Comment on lines +207 to +209
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implements @domenic's suggestion from #1207 (comment).

I don't know if we need to update the spec text for this. It already specifies that these checks must happen before performing any reads and writes:

Shutdown must stop activity: if shuttingDown becomes true, the user agent must not initiate further reads from reader, and must only perform writes of already-read chunks, as described below. In particular, the user agent must check the below conditions before performing any reads or writes, since they might lead to immediate shutdown.

We should still add a test for this particular case (although that might not be easy looking at the discussion in #1207).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally don't think we need to update the spec text.

return new Promise((resolveRead, rejectRead) => {
ReadableStreamDefaultReaderRead(
reader,
Expand All @@ -219,31 +226,50 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
}

// Errors must be propagated forward
isOrBecomesErrored(source, reader._closedPromise, storedError => {
function sourceIsOrBecomesErrored() {
const storedError = source._storedError;
if (preventAbort === false) {
shutdownWithAction(() => WritableStreamAbort(dest, storedError), true, storedError);
} else {
shutdown(true, storedError);
}
});
}

// Errors must be propagated backward
isOrBecomesErrored(dest, writer._closedPromise, storedError => {
function destIsOrBecomesErroringOrErrored() {
const storedError = dest._storedError;
if (preventCancel === false) {
shutdownWithAction(() => ReadableStreamCancel(source, storedError), true, storedError);
} else {
shutdown(true, storedError);
}
});
}

// Closing must be propagated forward
isOrBecomesClosed(source, reader._closedPromise, () => {
function sourceIsOrBecomesClosed() {
if (preventClose === false) {
shutdownWithAction(() => WritableStreamDefaultWriterCloseWithErrorPropagation(writer));
} else {
shutdown();
}
});
}

function checkState() {
const sourceState = source._state;
const destState = dest._state;
if (sourceState === 'errored') {
// Errors must be propagated forward
sourceIsOrBecomesErrored();
} else if (destState === 'erroring' || destState === 'errored') {
// Errors must be propagated backward
destIsOrBecomesErroringOrErrored();
} else if (sourceState === 'closed') {
// Closing must be propagated forward
sourceIsOrBecomesClosed();
}
}

checkState();

// Closing must be propagated backward
if (WritableStreamCloseQueuedOrInFlight(dest) === true || dest._state === 'closed') {
Expand All @@ -256,7 +282,13 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
}
}

setPromiseIsHandledToTrue(pipeLoop());
if (!shuttingDown) {
assert(source._state === 'readable' && dest._state === 'writable');
readerAddStateChangeListener(reader, checkState);
writerAddStateChangeListener(writer, checkState);

setPromiseIsHandledToTrue(pipeLoop());
}

function waitForWritesToFinish() {
// Another write may have started while we were waiting on this currentWrite, so we have to be sure to wait
Expand All @@ -268,27 +300,13 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
);
}

function isOrBecomesErrored(stream, promise, action) {
if (stream._state === 'errored') {
action(stream._storedError);
} else {
uponRejection(promise, action);
}
}

function isOrBecomesClosed(stream, promise, action) {
if (stream._state === 'closed') {
action();
} else {
uponFulfillment(promise, action);
}
}

function shutdownWithAction(action, originalIsError, originalError) {
if (shuttingDown === true) {
return;
}
shuttingDown = true;
ReadableStreamDefaultReaderRelease(reader);
reader = AcquireReadableStreamDefaultReader(source);

if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) {
uponFulfillment(waitForWritesToFinish(), doTheRest);
Expand All @@ -310,6 +328,8 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
return;
}
shuttingDown = true;
ReadableStreamDefaultReaderRelease(reader);
reader = AcquireReadableStreamDefaultReader(source);

if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) {
uponFulfillment(waitForWritesToFinish(), () => finalize(isError, error));
Expand All @@ -319,6 +339,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
}

function finalize(isError, error) {
assert(ReadableStreamDefaultReader.isImpl(reader));
WritableStreamDefaultWriterRelease(writer);
ReadableStreamDefaultReaderRelease(reader);

Expand Down Expand Up @@ -770,6 +791,7 @@ function ReadableStreamClose(stream) {
}

resolvePromise(reader._closedPromise, undefined);
readerRunStateChangeListeners(reader);

if (ReadableStreamDefaultReader.isImpl(reader)) {
const readRequests = reader._readRequests;
Expand All @@ -794,6 +816,7 @@ function ReadableStreamError(stream, e) {

rejectPromise(reader._closedPromise, e);
setPromiseIsHandledToTrue(reader._closedPromise);
readerRunStateChangeListeners(reader);

if (ReadableStreamDefaultReader.isImpl(reader)) {
ReadableStreamDefaultReaderErrorReadRequests(reader, e);
Expand Down Expand Up @@ -877,6 +900,8 @@ function ReadableStreamReaderGenericInitialize(reader, stream) {
reader._stream = stream;
stream._reader = reader;

reader._stateChangeListeners = [];

if (stream._state === 'readable') {
reader._closedPromise = newPromise();
} else if (stream._state === 'closed') {
Expand Down Expand Up @@ -910,6 +935,22 @@ function ReadableStreamReaderGenericRelease(reader) {

stream._reader = undefined;
reader._stream = undefined;

reader._stateChangeListeners = [];
}

function readerAddStateChangeListener(reader, stateChangeListener) {
const stream = reader._stream;
assert(stream !== undefined);
reader._stateChangeListeners.push(stateChangeListener);
}

function readerRunStateChangeListeners(reader) {
const stateChangeListeners = reader._stateChangeListeners;
reader._stateChangeListeners = [];
for (const stateChangeListener of stateChangeListeners) {
stateChangeListener();
}
}

function ReadableStreamBYOBReaderRead(reader, view, readIntoRequest) {
Expand Down
28 changes: 27 additions & 1 deletion reference-implementation/lib/abstract-ops/writable-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ Object.assign(exports, {
WritableStreamDefaultWriterCloseWithErrorPropagation,
WritableStreamDefaultWriterGetDesiredSize,
WritableStreamDefaultWriterRelease,
WritableStreamDefaultWriterWrite
WritableStreamDefaultWriterWrite,
writerAddStateChangeListener
});

// Working with writable streams
Expand Down Expand Up @@ -143,6 +144,8 @@ function SetUpWritableStreamDefaultWriter(writer, stream) {
writer._stream = stream;
stream._writer = writer;

writer._stateChangeListeners = [];

const state = stream._state;

if (state === 'writable') {
Expand Down Expand Up @@ -239,6 +242,11 @@ function WritableStreamFinishErroring(stream) {
}
stream._writeRequests = [];

const writer = stream._writer;
if (writer !== undefined) {
writerRunStateChangeListeners(writer);
}

if (stream._pendingAbortRequest === undefined) {
WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
return;
Expand Down Expand Up @@ -289,6 +297,7 @@ function WritableStreamFinishInFlightClose(stream) {
const writer = stream._writer;
if (writer !== undefined) {
resolvePromise(writer._closedPromise, undefined);
writerRunStateChangeListeners(writer);
}

assert(stream._pendingAbortRequest === undefined);
Expand Down Expand Up @@ -378,6 +387,7 @@ function WritableStreamStartErroring(stream, reason) {
const writer = stream._writer;
if (writer !== undefined) {
WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason);
writerRunStateChangeListeners(writer, reason);
}

if (WritableStreamHasOperationMarkedInFlight(stream) === false && controller._started === true) {
Expand Down Expand Up @@ -491,6 +501,8 @@ function WritableStreamDefaultWriterRelease(writer) {

stream._writer = undefined;
writer._stream = undefined;

writer._stateChangeListeners = [];
}

function WritableStreamDefaultWriterWrite(writer, chunk) {
Expand Down Expand Up @@ -526,6 +538,20 @@ function WritableStreamDefaultWriterWrite(writer, chunk) {
return promise;
}

function writerAddStateChangeListener(writer, stateChangeListener) {
const stream = writer._stream;
assert(stream !== undefined);
writer._stateChangeListeners.push(stateChangeListener);
}

function writerRunStateChangeListeners(writer) {
const stateChangeListeners = writer._stateChangeListeners;
writer._stateChangeListeners = [];
for (const stateChangeListener of stateChangeListeners) {
stateChangeListener();
}
}

// Default controllers

function SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm,
Expand Down
2 changes: 1 addition & 1 deletion reference-implementation/web-platform-tests