From 3f0bcfb203cef1d507d22261493d5e0195308440 Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Sat, 8 Jan 2022 12:46:07 +0200 Subject: [PATCH] stream: add forEach method Add a `forEach` method to readable streams to enable concurrent iteration and align with the iterator-helpers proposal. Co-Authored-By: Robert Nagy PR-URL: https://github.com/nodejs/node/pull/41445 Reviewed-By: Matteo Collina Reviewed-By: Robert Nagy --- doc/api/stream.md | 63 +++++++++++++++++++- lib/internal/streams/operators.js | 28 +++++++-- lib/stream.js | 15 ++++- test/parallel/test-stream-forEach.js | 86 ++++++++++++++++++++++++++++ 4 files changed, 182 insertions(+), 10 deletions(-) create mode 100644 test/parallel/test-stream-forEach.js diff --git a/doc/api/stream.md b/doc/api/stream.md index e2bded5c62a3eb..bd4573e487f1ad 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1751,7 +1751,7 @@ added: REPLACEME * `signal` {AbortSignal} aborted if the stream is destroyed allowing to abort the `fn` call early. * `options` {Object} - * `concurrency` {number} the maximal concurrent invocation of `fn` to call + * `concurrency` {number} the maximum concurrent invocation of `fn` to call on the stream at once. **Default:** `1`. * `signal` {AbortSignal} allows destroying the stream if the signal is aborted. @@ -1795,7 +1795,7 @@ added: REPLACEME * `signal` {AbortSignal} aborted if the stream is destroyed allowing to abort the `fn` call early. * `options` {Object} - * `concurrency` {number} the maximal concurrent invocation of `fn` to call + * `concurrency` {number} the maximum concurrent invocation of `fn` to call on the stream at once. **Default:** `1`. * `signal` {AbortSignal} allows destroying the stream if the signal is aborted. @@ -1830,6 +1830,65 @@ for await (const result of dnsResults) { } ``` +### `readable.forEach(fn[, options])` + + + +> Stability: 1 - Experimental + +* `fn` {Function|AsyncFunction} a function to call on each item of the stream. + * `data` {any} a chunk of data from the stream. + * `options` {Object} + * `signal` {AbortSignal} aborted if the stream is destroyed allowing to + abort the `fn` call early. +* `options` {Object} + * `concurrency` {number} the maximum concurrent invocation of `fn` to call + on the stream at once. **Default:** `1`. + * `signal` {AbortSignal} allows destroying the stream if the signal is + aborted. +* Returns: {Promise} a promise for when the stream has finished. + +This method allows iterating a stream. For each item in the stream the +`fn` function will be called. If the `fn` function returns a promise - that +promise will be `await`ed. + +This method is different from `for await...of` loops in that it can optionally +process items concurrently. In addition, a `forEach` iteration can only be +stopped by having passed a `signal` option and aborting the related +`AbortController` while `for await...of` can be stopped with `break` or +`return`. In either case the stream will be destroyed. + +This method is different from listening to the [`'data'`][] event in that it +uses the [`readable`][] event in the underlying machinary and can limit the +number of concurrent `fn` calls. + +```mjs +import { Readable } from 'stream'; +import { Resolver } from 'dns/promises'; + +// With a synchronous predicate. +for await (const item of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) { + console.log(item); // 3, 4 +} +// With an asynchronous predicate, making at most 2 queries at a time. +const resolver = new Resolver(); +const dnsResults = await Readable.from([ + 'nodejs.org', + 'openjsf.org', + 'www.linuxfoundation.org', +]).map(async (domain) => { + const { address } = await resolver.resolve4(domain, { ttl: true }); + return address; +}, { concurrency: 2 }); +await dnsResults.forEach((result) => { + // Logs result, similar to `for await (const result of dnsResults)` + console.log(result); +}); +console.log('done'); // Stream has finished +``` + ### Duplex and transform streams #### Class: `stream.Duplex` diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 267cf53740bd7f..c9581f7b6dfe6c 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -23,7 +23,7 @@ const kEof = Symbol('kEof'); async function * map(fn, options) { if (typeof fn !== 'function') { throw new ERR_INVALID_ARG_TYPE( - 'fn', ['Function', 'AsyncFunction'], this); + 'fn', ['Function', 'AsyncFunction'], fn); } if (options != null && typeof options !== 'object') { @@ -147,10 +147,23 @@ async function * map(fn, options) { } } +async function forEach(fn, options) { + if (typeof fn !== 'function') { + throw new ERR_INVALID_ARG_TYPE( + 'fn', ['Function', 'AsyncFunction'], fn); + } + async function forEachFn(value, options) { + await fn(value, options); + return kEmpty; + } + // eslint-disable-next-line no-unused-vars + for await (const unused of this.map(forEachFn, options)); +} + async function * filter(fn, options) { if (typeof fn !== 'function') { - throw (new ERR_INVALID_ARG_TYPE( - 'fn', ['Function', 'AsyncFunction'], this)); + throw new ERR_INVALID_ARG_TYPE( + 'fn', ['Function', 'AsyncFunction'], fn); } async function filterFn(value, options) { if (await fn(value, options)) { @@ -160,7 +173,12 @@ async function * filter(fn, options) { } yield* this.map(filterFn, options); } -module.exports = { + +module.exports.streamReturningOperators = { + filter, map, - filter +}; + +module.exports.promiseReturningOperators = { + forEach, }; diff --git a/lib/stream.js b/lib/stream.js index 25dd9f0ba52985..2c3261123af66e 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -31,7 +31,10 @@ const { promisify: { custom: customPromisify }, } = require('internal/util'); -const operators = require('internal/streams/operators'); +const { + streamReturningOperators, + promiseReturningOperators, +} = require('internal/streams/operators'); const compose = require('internal/streams/compose'); const { pipeline } = require('internal/streams/pipeline'); const { destroyer } = require('internal/streams/destroy'); @@ -46,12 +49,18 @@ Stream.isDisturbed = utils.isDisturbed; Stream.isErrored = utils.isErrored; Stream.isReadable = utils.isReadable; Stream.Readable = require('internal/streams/readable'); -for (const key of ObjectKeys(operators)) { - const op = operators[key]; +for (const key of ObjectKeys(streamReturningOperators)) { + const op = streamReturningOperators[key]; Stream.Readable.prototype[key] = function(...args) { return Stream.Readable.from(ReflectApply(op, this, args)); }; } +for (const key of ObjectKeys(promiseReturningOperators)) { + const op = promiseReturningOperators[key]; + Stream.Readable.prototype[key] = function(...args) { + return ReflectApply(op, this, args); + }; +} Stream.Writable = require('internal/streams/writable'); Stream.Duplex = require('internal/streams/duplex'); Stream.Transform = require('internal/streams/transform'); diff --git a/test/parallel/test-stream-forEach.js b/test/parallel/test-stream-forEach.js new file mode 100644 index 00000000000000..82013554cd2aa8 --- /dev/null +++ b/test/parallel/test-stream-forEach.js @@ -0,0 +1,86 @@ +'use strict'; + +const common = require('../common'); +const { + Readable, +} = require('stream'); +const assert = require('assert'); +const { setTimeout } = require('timers/promises'); + +{ + // forEach works on synchronous streams with a synchronous predicate + const stream = Readable.from([1, 2, 3]); + const result = [1, 2, 3]; + (async () => { + await stream.forEach((value) => assert.strictEqual(value, result.shift())); + })().then(common.mustCall()); +} + +{ + // forEach works an asynchronous streams + const stream = Readable.from([1, 2, 3]).filter(async (x) => { + await Promise.resolve(); + return true; + }); + const result = [1, 2, 3]; + (async () => { + await stream.forEach((value) => assert.strictEqual(value, result.shift())); + })().then(common.mustCall()); +} + +{ + // forEach works on asynchronous streams with a asynchronous forEach fn + const stream = Readable.from([1, 2, 3]).filter(async (x) => { + await Promise.resolve(); + return true; + }); + const result = [1, 2, 3]; + (async () => { + await stream.forEach(async (value) => { + await Promise.resolve(); + assert.strictEqual(value, result.shift()); + }); + })().then(common.mustCall()); +} + +{ + // Concurrency + AbortSignal + const ac = new AbortController(); + let calls = 0; + const forEachPromise = + Readable.from([1, 2, 3, 4]).forEach(async (_, { signal }) => { + calls++; + await setTimeout(100, { signal }); + }, { signal: ac.signal, concurrency: 2 }); + // pump + assert.rejects(async () => { + await forEachPromise; + }, { + name: 'AbortError', + }).then(common.mustCall()); + + setImmediate(() => { + ac.abort(); + assert.strictEqual(calls, 2); + }); +} + +{ + // Error cases + assert.rejects(async () => { + await Readable.from([1]).forEach(1); + }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); + assert.rejects(async () => { + await Readable.from([1]).forEach((x) => x, { + concurrency: 'Foo' + }); + }, /ERR_OUT_OF_RANGE/).then(common.mustCall()); + assert.rejects(async () => { + await Readable.from([1]).forEach((x) => x, 1); + }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); +} +{ + // Test result is a Promise + const stream = Readable.from([1, 2, 3, 4, 5]).forEach((_) => true); + assert.strictEqual(typeof stream.then, 'function'); +}