From 942dc4ba7eedc02f070dffb580933e1bb2a5fb6d Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sun, 20 Aug 2023 01:42:56 +0300 Subject: [PATCH] stream: add `pool` option to the map operator Fixes: https://github.com/nodejs/node/issues/46132 --- lib/internal/streams/operators.js | 86 +++++++++++++++++- test/parallel/test-stream-map.js | 140 ++++++++++++++++++++++++++++++ 2 files changed, 223 insertions(+), 3 deletions(-) diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 8f4797da5dd519..9469404959d068 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -23,6 +23,7 @@ const { addAbortSignalNoValidate, } = require('internal/streams/add-abort-signal'); const { isWritable, isNodeStream } = require('internal/streams/utils'); +const { createDeferredPromise } = require('internal/util'); const { ArrayPrototypePush, @@ -33,6 +34,7 @@ const { Promise, PromiseReject, PromisePrototypeThen, + PromiseResolve, Symbol, } = primordials; @@ -64,6 +66,32 @@ function compose(stream, options) { return composedStream; } +function dynamicPromiseRace(promises) { + const { promise, resolve, reject } = createDeferredPromise(); + + for (let i = 0; i < promises.length; i++) { + PromisePrototypeThen(PromiseResolve(promises[i]), (val) => resolve({ + __proto__: null, + value: val, + i, + }), reject); + } + + return { + __proto__: null, + promise, + addToRace: (newValueInQueue) => { + const i = promises.length; + PromisePrototypeThen(newValueInQueue, (value) => resolve({ + __proto__: null, + value, + i, + }), reject); + }, + }; + +} + function map(fn, options) { if (typeof fn !== 'function') { throw new ERR_INVALID_ARG_TYPE( @@ -76,12 +104,12 @@ function map(fn, options) { validateAbortSignal(options.signal, 'options.signal'); } - let concurrency = 1; + let concurrency = options?.pool ? 2 : 1; if (options?.concurrency != null) { concurrency = MathFloor(options.concurrency); } - validateInteger(concurrency, 'concurrency', 1); + validateInteger(concurrency, 'concurrency', options?.pool ? 2 : 1); return async function* map() { const signal = AbortSignal.any([options?.signal].filter(Boolean)); @@ -92,6 +120,7 @@ function map(fn, options) { let next; let resume; let done = false; + let addToRace; function onDone() { done = true; @@ -122,6 +151,9 @@ function map(fn, options) { val.catch(onDone); } + // This is needed for the pool strategy + addToRace?.(val); + queue.push(val); if (next) { next(); @@ -150,7 +182,47 @@ function map(fn, options) { pump(); - try { + async function* poolStrategy() { + while (true) { + while (queue.length > 0) { + let promise; + + addToRace = null; + ({ promise, addToRace } = dynamicPromiseRace(queue)); + + const { i, value } = await promise; + + queue.splice(i, 1); + + if (value === kEof) { + continue; + } + + if (signal.aborted) { + throw new AbortError(); + } + + if (value !== kEmpty) { + yield value; + } + + if (resume) { + resume(); + resume = null; + } + } + + if (done) { + return; + } + + await new Promise((resolve) => { + next = resolve; + }); + } + } + + async function* queueStrategy() { while (true) { while (queue.length > 0) { const val = await queue[0]; @@ -178,6 +250,14 @@ function map(fn, options) { next = resolve; }); } + } + + try { + if (options?.pool) { + yield* await poolStrategy(); + } else { + yield* await queueStrategy(); + } } finally { done = true; if (resume) { diff --git a/test/parallel/test-stream-map.js b/test/parallel/test-stream-map.js index ba0571fe3a7b95..399d62b9fe7429 100644 --- a/test/parallel/test-stream-map.js +++ b/test/parallel/test-stream-map.js @@ -8,6 +8,25 @@ const assert = require('assert'); const { once } = require('events'); const { setTimeout } = require('timers/promises'); +function createDependentPromises(n) { + const promiseAndResolveArray = []; + + for (let i = 0; i < n; i++) { + let res; + const promise = new Promise((resolve) => { + if (i === 0) { + res = resolve; + return; + } + res = () => promiseAndResolveArray[i - 1][0].then(resolve); + }); + + promiseAndResolveArray.push([promise, res]); + } + + return promiseAndResolveArray; +} + { // Map works on synchronous streams with a synchronous mapper const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x); @@ -173,15 +192,136 @@ const { setTimeout } = require('timers/promises'); })().then(common.mustCall()); } + +{ + // Simple pool-based concurrency + const finishOrder = []; + + const promises = createDependentPromises(4); + + const raw = Readable.from([2, 0, 1, 3]); + const stream = raw.map(async (item) => { + const [promise, resolve] = promises[item]; + resolve(); + + await promise; + finishOrder.push(item); + return item; + }, { concurrency: 2, pool: true }); + + (async () => { + await stream.toArray(); + + assert.deepStrictEqual(finishOrder, [0, 1, 2, 3]); + })().then(common.mustCall(), common.mustNotCall()); +} + +{ + // Pool-based concurrency with a lot of items and large concurrency + const finishOrder = []; + + const promises = createDependentPromises(20); + + const raw = Readable.from([11, 1, 0, 3, 4, 2, 5, 7, 8, 9, 6, 10, 12, 13, 18, 15, 16, 17, 14, 19]); + // Should be + // 11, 1, 0, 3, 4 | next: 0 + // 11, 1, 3, 4, 2 | next: 1 + // 11, 3, 4, 2, 5 | next: 2 + // 11, 3, 4, 5, 7 | next: 3 + // 11, 4, 5, 7, 8 | next: 4 + // 11, 5, 7, 8, 9 | next: 5 + // 11, 7, 8, 9, 6 | next: 6 + // 11, 7, 8, 9, 10 | next: 7 + // 11, 8, 9, 10, 12 | next: 8 + // 11, 9, 10, 12, 13 | next: 9 + // 11, 10, 12, 13, 18 | next: 10 + // 11, 12, 13, 18, 15 | next: 11 + // 12, 13, 18, 15, 16 | next: 12 + // 13, 18, 15, 16, 17 | next: 13 + // 18, 15, 16, 17, 14 | next: 14 + // 18, 15, 16, 17, 19 | next: 15 + // 18, 16, 17, 19 | next: 16 + // 18, 17, 19 | next: 17 + // 18, 19 | next: 18 + // 19 | next: 19 + // + + const stream = raw.map(async (item) => { + const [promise, resolve] = promises[item]; + resolve(); + + await promise; + finishOrder.push(item); + return item; + }, { concurrency: 5, pool: true }); + + (async () => { + await stream.toArray(); + + assert.deepStrictEqual(finishOrder, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]); + })().then(common.mustCall(), common.mustNotCall()); +} + +{ + // Pool-based concurrency where there is a delay between the first and the next item in the pool + const finishOrder = []; + + const raw = Readable.from((async function *() { + yield 200; + + // Making sure the first item (200) finish before the next item (0) starts + // this is to make sure we don't wait for the pool to be filled before starting + await setTimeout(500); + + yield 0; + yield 1; + })()); + + let start; + let delaysBetweenFirstAndNextPoolItem; + + const stream = raw + .map(async (item) => { + await setTimeout(item); + finishOrder.push(item); + + return item; + }, { concurrency: 2, pool: true }) + .map((item) => { + if (item === 200) { + start = Date.now(); + } + + if (item === 0) { + delaysBetweenFirstAndNextPoolItem = Date.now() - start; + } + + return item; + }); + + (async () => { + await stream.toArray(); + + assert.deepStrictEqual(finishOrder, [200, 0, 1]); + // More than 250ms because the first item waits for 200ms and the next item have delay of 500ms + assert.ok(delaysBetweenFirstAndNextPoolItem > 250, new Error(`delay between first and next item in the pool should be more than 250ms but instead got ${delaysBetweenFirstAndNextPoolItem}`)); + })().then(common.mustCall(), common.mustNotCall()); +} + { // Error cases assert.throws(() => Readable.from([1]).map(1), /ERR_INVALID_ARG_TYPE/); assert.throws(() => Readable.from([1]).map((x) => x, { concurrency: 'Foo' }), /ERR_OUT_OF_RANGE/); + assert.throws(() => Readable.from([1]).map((x) => x, { + concurrency: 1, + pool: true + }), /ERR_OUT_OF_RANGE/); assert.throws(() => Readable.from([1]).map((x) => x, 1), /ERR_INVALID_ARG_TYPE/); assert.throws(() => Readable.from([1]).map((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/); } + { // Test result is a Readable const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x);