-
-
Notifications
You must be signed in to change notification settings - Fork 33.2k
streams: implement Readable.from async iterator utility #27660
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,8 +46,8 @@ There are four fundamental stream types within Node.js: | |
* [`Transform`][] - `Duplex` streams that can modify or transform the data as it | ||
is written and read (for example, [`zlib.createDeflate()`][]). | ||
|
||
Additionally, this module includes the utility functions [pipeline][] and | ||
[finished][]. | ||
Additionally, this module includes the utility functions [pipeline][], | ||
[finished][] and [Readable.from][]. | ||
|
||
### Object Mode | ||
|
||
|
@@ -1480,6 +1480,31 @@ async function run() { | |
run().catch(console.error); | ||
``` | ||
|
||
### Readable.from(iterable, [options]) | ||
|
||
* `iterable` {Iterable} Object implementing the `Symbol.asyncIterator` or | ||
`Symbol.iterator` iterable protocol. | ||
* `options` {Object} Options provided to `new stream.Readable([options])`. | ||
By default, `Readable.from()` will set `options.objectMode` to `true`, unless | ||
this is explicitly opted out by setting `options.objectMode` to `false`. | ||
|
||
A utility method for creating Readable Streams out of iterators. | ||
|
||
```js | ||
const { Readable } = require('stream'); | ||
|
||
async function * generate() { | ||
yield 'hello'; | ||
yield 'streams'; | ||
} | ||
|
||
const readable = Readable.from(generate()); | ||
|
||
readable.on('data', (chunk) => { | ||
console.log(chunk); | ||
}); | ||
``` | ||
|
||
## API for Stream Implementers | ||
|
||
<!--type=misc--> | ||
|
@@ -2395,6 +2420,89 @@ primarily for examples and testing, but there are some use cases where | |
|
||
<!--type=misc--> | ||
|
||
### Streams Compatibility with Async Generators and Async Iterators | ||
|
||
With the support of async generators and iterators in JavaScript, async | ||
generators are effectively a first-class language-level stream construct at | ||
this point. | ||
|
||
Some common interop cases of using Node.js streams with async generators | ||
and async iterators are provided below. | ||
|
||
#### Consuming Readable Streams with Async Iterators | ||
|
||
```js | ||
(async function() { | ||
for await (const chunk of readable) { | ||
console.log(chunk); | ||
} | ||
})(); | ||
``` | ||
|
||
#### Creating Readable Streams with Async Generators | ||
|
||
We can construct a Node.js Readable Stream from an asynchronous generator | ||
using the `Readable.from` utility method: | ||
|
||
```js | ||
const { Readable } = require('stream'); | ||
|
||
async function * generate() { | ||
yield 'a'; | ||
yield 'b'; | ||
yield 'c'; | ||
} | ||
|
||
const readable = Readable.from(generate()); | ||
|
||
readable.on('data', (chunk) => { | ||
console.log(chunk); | ||
}); | ||
``` | ||
|
||
#### Piping to Writable Streams from Async Iterators | ||
|
||
In the scenario of writing to a writeable stream from an async iterator, | ||
it is important to ensure the correct handling of backpressure and errors. | ||
|
||
```js | ||
const { once } = require('events'); | ||
|
||
const writeable = fs.createWriteStream('./file'); | ||
|
||
(async function() { | ||
for await (const chunk of iterator) { | ||
// Handle backpressure on write | ||
if (!writeable.write(value)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. where does There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should be |
||
await once(writeable, 'drain'); | ||
} | ||
writeable.end(); | ||
// Ensure completion without errors | ||
await once(writeable, 'finish'); | ||
})(); | ||
``` | ||
guybedford marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
In the above, errors on the write stream would be caught and thrown by the two | ||
`once` listeners, since `once` will also handle `'error'` events. | ||
|
||
Alternatively the readable stream could be wrapped with `Readable.from` and | ||
then piped via `.pipe`: | ||
|
||
```js | ||
const { once } = require('events'); | ||
|
||
const writeable = fs.createWriteStream('./file'); | ||
|
||
(async function() { | ||
const readable = Readable.from(iterator); | ||
readable.pipe(writeable); | ||
// Ensure completion without errors | ||
await once(writeable, 'finish'); | ||
})(); | ||
``` | ||
|
||
<!--type=misc--> | ||
|
||
### Compatibility with Older Node.js Versions | ||
|
||
<!--type=misc--> | ||
|
@@ -2531,6 +2639,7 @@ contain multi-byte characters. | |
[Compatibility]: #stream_compatibility_with_older_node_js_versions | ||
[HTTP requests, on the client]: http.html#http_class_http_clientrequest | ||
[HTTP responses, on the server]: http.html#http_class_http_serverresponse | ||
[Readable.from]: #readable.from | ||
[TCP sockets]: net.html#net_class_net_socket | ||
[child process stdin]: child_process.html#child_process_subprocess_stdin | ||
[child process stdout and stderr]: child_process.html#child_process_subprocess_stdout | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -90,4 +90,4 @@ Promise.all([ | |
catchesErrors(), | ||
stopListeningAfterCatchingError(), | ||
onceError() | ||
]); | ||
]).then(common.mustCall()); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
'use strict'; | ||
guybedford marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
const { mustCall } = require('../common'); | ||
const { once } = require('events'); | ||
const { Readable } = require('stream'); | ||
const { strictEqual } = require('assert'); | ||
|
||
async function toReadableBasicSupport() { | ||
async function * generate() { | ||
yield 'a'; | ||
yield 'b'; | ||
yield 'c'; | ||
} | ||
|
||
const stream = Readable.from(generate()); | ||
|
||
const expected = ['a', 'b', 'c']; | ||
|
||
for await (const chunk of stream) { | ||
strictEqual(chunk, expected.shift()); | ||
} | ||
} | ||
|
||
async function toReadableSyncIterator() { | ||
function * generate() { | ||
yield 'a'; | ||
yield 'b'; | ||
yield 'c'; | ||
} | ||
|
||
const stream = Readable.from(generate()); | ||
|
||
const expected = ['a', 'b', 'c']; | ||
|
||
for await (const chunk of stream) { | ||
strictEqual(chunk, expected.shift()); | ||
} | ||
} | ||
|
||
async function toReadablePromises() { | ||
const promises = [ | ||
Promise.resolve('a'), | ||
Promise.resolve('b'), | ||
Promise.resolve('c') | ||
]; | ||
|
||
const stream = Readable.from(promises); | ||
|
||
const expected = ['a', 'b', 'c']; | ||
|
||
for await (const chunk of stream) { | ||
strictEqual(chunk, expected.shift()); | ||
} | ||
} | ||
|
||
async function toReadableString() { | ||
const stream = Readable.from('abc'); | ||
|
||
const expected = ['a', 'b', 'c']; | ||
|
||
for await (const chunk of stream) { | ||
strictEqual(chunk, expected.shift()); | ||
} | ||
} | ||
|
||
async function toReadableOnData() { | ||
async function * generate() { | ||
yield 'a'; | ||
yield 'b'; | ||
yield 'c'; | ||
} | ||
|
||
const stream = Readable.from(generate()); | ||
|
||
let iterations = 0; | ||
const expected = ['a', 'b', 'c']; | ||
|
||
stream.on('data', (chunk) => { | ||
iterations++; | ||
strictEqual(chunk, expected.shift()); | ||
}); | ||
|
||
await once(stream, 'end'); | ||
|
||
strictEqual(iterations, 3); | ||
} | ||
|
||
async function toReadableOnDataNonObject() { | ||
async function * generate() { | ||
yield 'a'; | ||
yield 'b'; | ||
yield 'c'; | ||
} | ||
|
||
const stream = Readable.from(generate(), { objectMode: false }); | ||
|
||
let iterations = 0; | ||
const expected = ['a', 'b', 'c']; | ||
|
||
stream.on('data', (chunk) => { | ||
iterations++; | ||
strictEqual(chunk instanceof Buffer, true); | ||
strictEqual(chunk.toString(), expected.shift()); | ||
}); | ||
|
||
await once(stream, 'end'); | ||
|
||
strictEqual(iterations, 3); | ||
} | ||
|
||
async function destroysTheStreamWhenThrowing() { | ||
async function * generate() { | ||
throw new Error('kaboom'); | ||
} | ||
|
||
const stream = Readable.from(generate()); | ||
|
||
stream.read(); | ||
|
||
try { | ||
await once(stream, 'error'); | ||
} catch (err) { | ||
strictEqual(err.message, 'kaboom'); | ||
strictEqual(stream.destroyed, true); | ||
} | ||
} | ||
|
||
async function asTransformStream() { | ||
async function * generate(stream) { | ||
for await (const chunk of stream) { | ||
yield chunk.toUpperCase(); | ||
} | ||
} | ||
|
||
const source = new Readable({ | ||
objectMode: true, | ||
read() { | ||
this.push('a'); | ||
this.push('b'); | ||
this.push('c'); | ||
this.push(null); | ||
} | ||
}); | ||
|
||
const stream = Readable.from(generate(source)); | ||
|
||
const expected = ['A', 'B', 'C']; | ||
|
||
for await (const chunk of stream) { | ||
strictEqual(chunk, expected.shift()); | ||
} | ||
} | ||
|
||
Promise.all([ | ||
guybedford marked this conversation as resolved.
Show resolved
Hide resolved
|
||
toReadableBasicSupport(), | ||
toReadableSyncIterator(), | ||
toReadablePromises(), | ||
toReadableString(), | ||
toReadableOnData(), | ||
toReadableOnDataNonObject(), | ||
destroysTheStreamWhenThrowing(), | ||
asTransformStream() | ||
]).then(mustCall()); |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.