-
Notifications
You must be signed in to change notification settings - Fork 29.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
lib: implement various stream utils for webstreams #39517
Conversation
Implements the finished, addAbortSignal, and pipeline functions for webstreams, and adds the documentation and tests for each function. The promisified versions of finished and pipeline are also added. Refs: #39316
|
||
throw new ERR_INVALID_ARG_TYPE( | ||
'stream', | ||
'ReadableStream|WritableStream|TransformStream', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'ReadableStream|WritableStream|TransformStream', | |
['ReadableStream', 'WritableStream', 'TransformStream'], |
if (streams.some(cannotWriteToStream)) { | ||
throw new ERR_INVALID_ARG_TYPE( | ||
'streams', | ||
'WritableStream|TransformStream', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'WritableStream|TransformStream', | |
['WritableStream', 'TransformStream'], |
if (streams.some(cannotReadFromStream)) { | ||
throw new ERR_INVALID_ARG_TYPE( | ||
'streams', | ||
'ReadableStream|TransformStream', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'ReadableStream|TransformStream', | |
['ReadableStream', 'TransformStream'], |
A lot of unrelated style changes in |
|
||
function readableStreamAddFinishedCallback(stream, callback) { | ||
assert(isReadableStream(stream)); | ||
stream[kState].finishedCallbacks.push(callback); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stream[kState].finishedCallbacks.push(callback); | |
ArrayPrototypePush(stream[kState].finishedCallbacks, callback); |
controller[kState].started = true; | ||
writableStreamDealWithRejection(stream, error); | ||
}); | ||
} | ||
|
||
function writableStreamAddFinishedCallback(stream, callback) { | ||
assert(isWritableStream(stream)); | ||
stream[kState].finishedCallbacks.push(callback); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stream[kState].finishedCallbacks.push(callback); | |
ArrayPrototypePush(stream[kState].finishedCallbacks, callback); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code contains utility functions that are only used once (such as signalIsAborted
, validateInputs
, throwErrorIfNotWebstream
, addAbortCallbackToSignal
, etc.), which makes the code hard to follow. Could you try to remove them? Instead you can use comments to explain what the code is doing.
@@ -1267,6 +1270,135 @@ added: REPLACEME | |||
--> | |||
|
|||
* Type: {WritableStream} | |||
### `webstream.finished(stream, callback)` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
### `webstream.finished(stream, callback)` | |
### `webstream.finished(stream, callback)` |
``` | ||
The `finished` API also provides promise version: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
``` | |
The `finished` API also provides promise version: | |
``` | |
The `finished` API also provides promise version: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should just add support for web streams in existing stream functions. Not implement new ones.
@VoltrexMaster The only formatting tool I used was eslint with the given config file in the repo, so I'm unsure as to why this has happened. Let me know if I'm missing something, but otherwise I'll see if I can sort this out. |
@ronag Understood. Is it satisfactory to add a conditional flow in the existing functions which calls the functions added in this request? |
validateInputs(signal, stream); | ||
|
||
const onAbort = () => { | ||
stream.abort(new AbortError()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
abort doesn't exist on web streams? Do they?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only on WritableStream
. On ReadableStream
it needs to be cancel()
.
There is another fundamental problem with this, however... if the ReadableStream
has a Reader
, or the WritableStream
has a Writer
, their cancel()
and abort()
methods cannot be called directly... and there's no public API for knowing if either is locked. If they are, there's no public API for getting the Reader or the Writer and those have to be used for doing the canceling/aborting.
Instead, in order for us to do this, we need to rely on the internal functions and cannot rely on the public API. This also means that these will only work for the core implementations of web streams (userland implementations would not be supported).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jasnell do you think this finished
for webstreams is still a worthwhile development given that this is the case?
I think for For For |
@@ -33,6 +33,9 @@ There are three primary types of objects | |||
* `WritableStream` - Represents a destination for streaming data. | |||
* `TransformStream` - Represents an algorithm for transforming streaming data. | |||
|
|||
Additionally, this module includes the utility functions | |||
`pipeline()`, `finished()`, and `addAbortSignal()`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm -1 on having this also exported on stream/web
. Just have the stream
exports support the additional types.
const { | ||
DOMException, | ||
} = internalBinding('messaging'); | ||
const { DOMException } = internalBinding('messaging'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated style changes. Please leave the require statements formatted as they are.
@@ -216,7 +207,8 @@ class ReadableStream { | |||
setupReadableByteStreamControllerFromSource( | |||
this, | |||
source, | |||
extractHighWaterMark(highWaterMark, 0)); | |||
extractHighWaterMark(highWaterMark, 0) | |||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of the unrelated style changes in here make this difficult to review.
*/ | ||
abort(reason = undefined) { | ||
this.cancel(reason); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot add methods to the standard class.
*/ | ||
abort(reason = undefined) { | ||
this?.readable?.abort?.(reason); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot add methods to the standard API
stream[kState].finishedCallbacks?.forEach((cb) => { | ||
cb(stream[kState].storedError); | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is the wrong approach. the WritableStream
and ReadableStream
APIs already have mechanisms for monitoring when each are closed via the closed
promises on the Writer
and Reader
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I appreciate the work here but this has quite a few reasons why it's the wrong approach.
First, there are way too many unrelated style changes that make it impossible to effectively review. All of the purely style changes should be removed.
Second, this adds non-standard methods to the ReadableStream
and TransportStream
APIs that we just can't add.
Third, the abort controller implementation fails to account for locked streams.
Fourth, this should modify the existing require('stream').pipeline|finished|addAbortSignal()
methods in place rather than creating new versions of them, creating new exports off stream/web
, or introducing the new stream/web/promises
.
Fifth, the way this adds the finish callbacks is problematic and definitely not ideal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm -1 for landing this. Support for webstreams should be added to the utilities we already have.
Please always tag @nodejs/streams when discussing streams PRs.
Thanks for all the feedback and sorry for it being a bit of a mess! Looks like I have some work to do... Seeing as though it'll be a good few changes, I'm going to close this PR for now. |
Any updates on this @sam-stanford ? Is this still in progress? |
Yep! Been away since I last took a look at this, so a little behind where is expected, but I'm making progress. I'm still learning the ropes for node and open source as a whole, so if this is something which needs doing quickly, feel free to take the reigns :) |
All right, I'll take a stab at it... Great work so far though! |
Implements the finished, addAbortSignal, and pipeline functions for webstreams, and adds the documentation and tests for each function. The promisified versions of finished and pipeline are also added.
Refs: #39316