-
Notifications
You must be signed in to change notification settings - Fork 29.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
lib: implement various stream utils for webstreams #39517
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -33,6 +33,9 @@ There are three primary types of objects | |||||||||||
* `WritableStream` - Represents a destination for streaming data. | ||||||||||||
* `TransformStream` - Represents an algorithm for transforming streaming data. | ||||||||||||
|
||||||||||||
Additionally, this module includes the utility functions | ||||||||||||
`pipeline()`, `finished()`, and `addAbortSignal()`. | ||||||||||||
|
||||||||||||
### Example `ReadableStream` | ||||||||||||
|
||||||||||||
This example creates a simple `ReadableStream` that pushes the current | ||||||||||||
|
@@ -1267,6 +1270,135 @@ added: REPLACEME | |||||||||||
--> | ||||||||||||
|
||||||||||||
* Type: {WritableStream} | ||||||||||||
### `webstream.finished(stream, callback)` | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
|
||||||||||||
* `stream` {Stream} A ReadableStream, WritableStream, or TransformStream. | ||||||||||||
* `callback` {Function} A callback function that takes an optional error | ||||||||||||
argument. | ||||||||||||
* Returns: {Function} A cleanup function which removes all callbacks with | ||||||||||||
the given stream. | ||||||||||||
|
||||||||||||
A function to get notified when a stream is no longer readable, writable | ||||||||||||
or has experienced an error or a premature close event. | ||||||||||||
|
||||||||||||
```js | ||||||||||||
const { finished, ReadableStream } = require('stream/web'); | ||||||||||||
|
||||||||||||
const rs = new ReadableStream(); | ||||||||||||
|
||||||||||||
finished(rs, (err) => { | ||||||||||||
// This will be called when the stream finishes | ||||||||||||
if (err) { | ||||||||||||
console.error('Stream failed.', err); | ||||||||||||
} else { | ||||||||||||
console.log('Stream is done reading.'); | ||||||||||||
} | ||||||||||||
}); | ||||||||||||
``` | ||||||||||||
The `finished` API also provides promise version: | ||||||||||||
Comment on lines
+1297
to
+1298
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
|
||||||||||||
```js | ||||||||||||
const { finished, ReadableStream } = require('stream/web/promises'); | ||||||||||||
|
||||||||||||
const rs = new ReadableStream(); | ||||||||||||
|
||||||||||||
async function run() { | ||||||||||||
await finished(rs); | ||||||||||||
console.log('Stream is done reading.'); | ||||||||||||
} | ||||||||||||
|
||||||||||||
run().catch(console.error); | ||||||||||||
``` | ||||||||||||
|
||||||||||||
Unlike `stream.finished()`, `webstream.finished()` does not | ||||||||||||
leaves dangling event listeners after `callback` has been invoked. | ||||||||||||
However, to unify the behaviour of the two functions, a cleanup | ||||||||||||
function is returned by `webstream.finished()`. This cleanup | ||||||||||||
function removes **all** registered callbacks from the stream. | ||||||||||||
|
||||||||||||
```js | ||||||||||||
const removeCallbacks = finished(rs, callback); | ||||||||||||
removeCallbacks(); // Callback will no longer call on finish | ||||||||||||
``` | ||||||||||||
|
||||||||||||
### `webstream.pipeline(source[, ...transforms], destination, callback)` | ||||||||||||
### `webstream.pipeline(streams, callback)` | ||||||||||||
|
||||||||||||
* `streams` {Stream[]} | ||||||||||||
* `source` {Stream} | ||||||||||||
* `...transforms` {Stream} | ||||||||||||
* `destination` {Stream} | ||||||||||||
* `callback` {Function} Called when the pipeline is fully done. | ||||||||||||
* `err` {Error} | ||||||||||||
* `val` Resolved value of `Promise` returned by `destination`. | ||||||||||||
|
||||||||||||
A module method to pipe between streams, forwarding errors and | ||||||||||||
properly cleaning up and provide a callback when the pipeline is complete. | ||||||||||||
|
||||||||||||
```js | ||||||||||||
const { pipeline } = require('stream/web'); | ||||||||||||
|
||||||||||||
// Use the pipeline API to easily pipe a series of streams | ||||||||||||
// together and get notified when the pipeline is fully done. | ||||||||||||
|
||||||||||||
pipeline( | ||||||||||||
new ReadableStream(), | ||||||||||||
new TransformStream(), | ||||||||||||
new WritableStream(), | ||||||||||||
(err) => { | ||||||||||||
if (err) { | ||||||||||||
console.error('Pipeline failed.', err); | ||||||||||||
} else { | ||||||||||||
console.log('Pipeline succeeded.'); | ||||||||||||
} | ||||||||||||
} | ||||||||||||
); | ||||||||||||
``` | ||||||||||||
|
||||||||||||
The `pipeline` API also provides a promise version. | ||||||||||||
|
||||||||||||
```js | ||||||||||||
const { | ||||||||||||
ReadableStream, | ||||||||||||
WritableStream, | ||||||||||||
TransformStream, | ||||||||||||
} = require('stream/web'); | ||||||||||||
const { pipeline } = require('stream/web/promises'); | ||||||||||||
|
||||||||||||
async function run() { | ||||||||||||
await pipeline( | ||||||||||||
new ReadableStream(), | ||||||||||||
new TransformStream(), | ||||||||||||
new WritableStream(), | ||||||||||||
); | ||||||||||||
console.log('Pipeline succeeded.'); | ||||||||||||
} | ||||||||||||
|
||||||||||||
run().catch(console.error); | ||||||||||||
``` | ||||||||||||
|
||||||||||||
### `webstream.addAbortSignal(signal, stream)` | ||||||||||||
* `signal` {AbortSignal} A signal representing possible cancellation | ||||||||||||
* `stream` {Stream} a stream to attach a signal to | ||||||||||||
|
||||||||||||
Attaches an AbortSignal to a readable or writeable stream. This lets code | ||||||||||||
control stream destruction using an `AbortController`. | ||||||||||||
|
||||||||||||
Calling `abort` on the `AbortController` corresponding to the passed | ||||||||||||
`AbortSignal` will abort the stream using the most appropriate functionality | ||||||||||||
for the stream. | ||||||||||||
|
||||||||||||
```js | ||||||||||||
const { ReadableStream, addAbortSignal } = require('stream/web'); | ||||||||||||
|
||||||||||||
const controller = new AbortController(); | ||||||||||||
addAbortSignal( | ||||||||||||
controller.signal, | ||||||||||||
new ReadableStream(), | ||||||||||||
); | ||||||||||||
// Later, abort the operation closing the stream | ||||||||||||
controller.abort(); | ||||||||||||
``` | ||||||||||||
|
||||||||||||
[Streams]: stream.md | ||||||||||||
[WHATWG Streams Standard]: https://streams.spec.whatwg.org/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
'use strict'; | ||
|
||
const { validateAbortSignal } = require('internal/validators'); | ||
const { isWebstream } = require('internal/webstreams/util'); | ||
const { finished } = require('internal/webstreams/finished'); | ||
const { AbortError, codes } = require('internal/errors'); | ||
const { ERR_INVALID_ARG_TYPE } = codes; | ||
|
||
function addAbortSignal(signal, stream) { | ||
validateInputs(signal, stream); | ||
|
||
const onAbort = () => { | ||
stream.abort(new AbortError()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. abort doesn't exist on web streams? Do they? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only on There is another fundamental problem with this, however... if the Instead, in order for us to do this, we need to rely on the internal functions and cannot rely on the public API. This also means that these will only work for the core implementations of web streams (userland implementations would not be supported). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jasnell do you think this |
||
}; | ||
|
||
if (signalIsAborted(signal)) { | ||
onAbort(); | ||
} else { | ||
addAbortCallbackToSignal(signal, stream, onAbort); | ||
} | ||
|
||
return stream; | ||
} | ||
|
||
function validateInputs(signal, stream) { | ||
validateAbortSignal(signal, 'signal'); | ||
throwErrorIfNotWebstream(stream); | ||
} | ||
|
||
function throwErrorIfNotWebstream(stream) { | ||
if (isWebstream(stream)) { | ||
return; | ||
} | ||
|
||
throw new ERR_INVALID_ARG_TYPE( | ||
'stream', | ||
'ReadableStream|WritableStream|TransformStream', | ||
stream, | ||
); | ||
} | ||
|
||
function signalIsAborted(signal) { | ||
return signal.aborted; | ||
} | ||
|
||
function addAbortCallbackToSignal(signal, stream, callback) { | ||
signal.addEventListener('abort', callback); | ||
finished(stream, () => { | ||
signal.removeEventListener('abort', callback); | ||
}); | ||
} | ||
|
||
module.exports = { | ||
addAbortSignal | ||
}; |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,118 @@ | ||||||
'use strict'; | ||||||
|
||||||
const { validateFunction } = require('internal/validators'); | ||||||
const { isWebstream } = require('internal/webstreams/util'); | ||||||
const { isTransformStream } = require('internal/webstreams/transformstream'); | ||||||
const { | ||||||
isWritableStreamClosed, | ||||||
isWritableStreamErrored, | ||||||
writableStreamAddFinishedCallback, | ||||||
writableStreamRemoveAllFinishedCallbacks, | ||||||
} = require('internal/webstreams/writablestream'); | ||||||
const { | ||||||
isReadableStream, | ||||||
isReadableStreamClosed, | ||||||
isReadableStreamErrored, | ||||||
readableStreamAddFinishedCallback, | ||||||
readableStreamRemoveAllFinishedCallbacks, | ||||||
} = require('internal/webstreams/readablestream'); | ||||||
const { once } = require('internal/util'); | ||||||
const { codes } = require('internal/errors'); | ||||||
const { ERR_INVALID_ARG_TYPE, ERR_MISSING_ARGS } = codes; | ||||||
|
||||||
function finished(stream, callback) { | ||||||
validateAndFixInputs(stream, callback); | ||||||
|
||||||
if (isTransformStream(stream)) { | ||||||
return finishedForTransformStream(stream, callback); | ||||||
} | ||||||
|
||||||
addCallbackToStream(stream, callback); | ||||||
callCallbackIfStreamIsFinished(stream, callback); | ||||||
|
||||||
return getCleanupFuncForStream(stream); | ||||||
} | ||||||
|
||||||
function validateAndFixInputs(stream, callback) { | ||||||
throwErrorIfNullStream(stream); | ||||||
callback = replaceWithNoOpIfNull(callback); | ||||||
validateInputs(stream, callback); | ||||||
return stream, once(callback); | ||||||
} | ||||||
|
||||||
function throwErrorIfNullStream(stream) { | ||||||
if (!stream) { | ||||||
throw new ERR_MISSING_ARGS('stream'); | ||||||
} | ||||||
} | ||||||
|
||||||
function replaceWithNoOpIfNull(callback) { | ||||||
if (!callback) { | ||||||
callback = () => {}; | ||||||
} | ||||||
return callback; | ||||||
} | ||||||
|
||||||
function validateInputs(stream, callback) { | ||||||
validateFunction(callback, 'callback'); | ||||||
throwErrorIfNotWebstream(stream); | ||||||
} | ||||||
|
||||||
function throwErrorIfNotWebstream(stream) { | ||||||
if (isWebstream(stream)) { | ||||||
return; | ||||||
} | ||||||
|
||||||
throw new ERR_INVALID_ARG_TYPE( | ||||||
'stream', | ||||||
'ReadableStream|WritableStream|TransformStream', | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
stream | ||||||
); | ||||||
} | ||||||
|
||||||
function finishedForTransformStream(transformStream, callback) { | ||||||
return finished(transformStream.readable, callback); | ||||||
} | ||||||
|
||||||
function addCallbackToStream(stream, callback) { | ||||||
if (isReadableStream(stream)) { | ||||||
readableStreamAddFinishedCallback(stream, callback); | ||||||
return; | ||||||
} | ||||||
writableStreamAddFinishedCallback(stream, callback); | ||||||
} | ||||||
|
||||||
function callCallbackIfStreamIsFinished(stream, callback) { | ||||||
if (isWebstreamFinished(stream)) { | ||||||
callback(); | ||||||
} | ||||||
} | ||||||
|
||||||
function isWebstreamFinished(stream) { | ||||||
if (isReadableStream(stream)) { | ||||||
return isReadableStreamFinished(stream); | ||||||
} | ||||||
return isWritableStreamFinished(stream); | ||||||
} | ||||||
|
||||||
function isReadableStreamFinished(rs) { | ||||||
return isReadableStreamClosed(rs) || isReadableStreamErrored(rs); | ||||||
} | ||||||
|
||||||
function isWritableStreamFinished(ws) { | ||||||
return isWritableStreamClosed(ws) || isWritableStreamErrored(ws); | ||||||
} | ||||||
|
||||||
function getCleanupFuncForStream(stream) { | ||||||
if (isReadableStream(stream)) { | ||||||
return () => { | ||||||
readableStreamRemoveAllFinishedCallbacks(stream); | ||||||
}; | ||||||
} | ||||||
|
||||||
return () => { | ||||||
writableStreamRemoveAllFinishedCallbacks(stream); | ||||||
}; | ||||||
} | ||||||
|
||||||
module.exports = { finished }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm -1 on having this also exported on
stream/web
. Just have thestream
exports support the additional types.