Skip to content

Commit 984dc74

Browse files
committed
stream: add pool option to the map operator
Fixes: #46132
1 parent 5026293 commit 984dc74

File tree

2 files changed

+223
-3
lines changed

2 files changed

+223
-3
lines changed

lib/internal/streams/operators.js

Lines changed: 83 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ const {
2323
addAbortSignalNoValidate,
2424
} = require('internal/streams/add-abort-signal');
2525
const { isWritable, isNodeStream } = require('internal/streams/utils');
26+
const { createDeferredPromise } = require('internal/util');
2627

2728
const {
2829
ArrayPrototypePush,
@@ -33,6 +34,7 @@ const {
3334
Promise,
3435
PromiseReject,
3536
PromisePrototypeThen,
37+
PromiseResolve,
3638
Symbol,
3739
} = primordials;
3840

@@ -64,6 +66,32 @@ function compose(stream, options) {
6466
return composedStream;
6567
}
6668

69+
function dynamicPromiseRace(promises) {
70+
const { promise, resolve, reject } = createDeferredPromise();
71+
72+
for (let i = 0; i < promises.length; i++) {
73+
PromisePrototypeThen(PromiseResolve(promises[i]), (val) => resolve({
74+
__proto__: null,
75+
value: val,
76+
i,
77+
}), reject);
78+
}
79+
80+
return {
81+
__proto__: null,
82+
promise,
83+
addToRace: (newValueInQueue) => {
84+
const i = promises.length;
85+
PromisePrototypeThen(newValueInQueue, (value) => resolve({
86+
__proto__: null,
87+
value,
88+
i,
89+
}), reject);
90+
},
91+
};
92+
93+
}
94+
6795
function map(fn, options) {
6896
if (typeof fn !== 'function') {
6997
throw new ERR_INVALID_ARG_TYPE(
@@ -76,12 +104,12 @@ function map(fn, options) {
76104
validateAbortSignal(options.signal, 'options.signal');
77105
}
78106

79-
let concurrency = 1;
107+
let concurrency = options?.pool ? 2 : 1;
80108
if (options?.concurrency != null) {
81109
concurrency = MathFloor(options.concurrency);
82110
}
83111

84-
validateInteger(concurrency, 'concurrency', 1);
112+
validateInteger(concurrency, 'concurrency', options?.pool ? 2 : 1);
85113

86114
return async function* map() {
87115
const signal = AbortSignal.any([options?.signal].filter(Boolean));
@@ -92,6 +120,7 @@ function map(fn, options) {
92120
let next;
93121
let resume;
94122
let done = false;
123+
let addToRace;
95124

96125
function onDone() {
97126
done = true;
@@ -122,6 +151,9 @@ function map(fn, options) {
122151
val.catch(onDone);
123152
}
124153

154+
// This is needed for the pool strategy
155+
addToRace?.(val);
156+
125157
queue.push(val);
126158
if (next) {
127159
next();
@@ -150,7 +182,47 @@ function map(fn, options) {
150182

151183
pump();
152184

153-
try {
185+
async function* poolStrategy() {
186+
while (true) {
187+
while (queue.length > 0) {
188+
let promise;
189+
190+
addToRace = null;
191+
({ promise, addToRace } = dynamicPromiseRace(queue));
192+
193+
const { i, value } = await promise;
194+
195+
queue.splice(i, 1);
196+
197+
if (value === kEof) {
198+
continue;
199+
}
200+
201+
if (signal.aborted) {
202+
throw new AbortError();
203+
}
204+
205+
if (value !== kEmpty) {
206+
yield value;
207+
}
208+
209+
if (resume) {
210+
resume();
211+
resume = null;
212+
}
213+
}
214+
215+
if (done) {
216+
return;
217+
}
218+
219+
await new Promise((resolve) => {
220+
next = resolve;
221+
});
222+
}
223+
}
224+
225+
async function* queueStrategy() {
154226
while (true) {
155227
while (queue.length > 0) {
156228
const val = await queue[0];
@@ -178,6 +250,14 @@ function map(fn, options) {
178250
next = resolve;
179251
});
180252
}
253+
}
254+
255+
try {
256+
if (options?.pool) {
257+
yield* await poolStrategy();
258+
} else {
259+
yield* await queueStrategy();
260+
}
181261
} finally {
182262
done = true;
183263
if (resume) {

test/parallel/test-stream-map.js

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,25 @@ const assert = require('assert');
88
const { once } = require('events');
99
const { setTimeout } = require('timers/promises');
1010

11+
function createDependentPromises(n) {
12+
const promiseAndResolveArray = [];
13+
14+
for (let i = 0; i < n; i++) {
15+
let res;
16+
const promise = new Promise((resolve) => {
17+
if (i === 0) {
18+
res = resolve;
19+
return;
20+
}
21+
res = () => promiseAndResolveArray[i - 1][0].then(resolve);
22+
});
23+
24+
promiseAndResolveArray.push([promise, res]);
25+
}
26+
27+
return promiseAndResolveArray;
28+
}
29+
1130
{
1231
// Map works on synchronous streams with a synchronous mapper
1332
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x);
@@ -173,15 +192,136 @@ const { setTimeout } = require('timers/promises');
173192
})().then(common.mustCall());
174193
}
175194

195+
196+
{
197+
// Simple pool-based concurrency
198+
const finishOrder = [];
199+
200+
const promises = createDependentPromises(4);
201+
202+
const raw = Readable.from([2, 0, 1, 3]);
203+
const stream = raw.map(async (item) => {
204+
const [promise, resolve] = promises[item];
205+
resolve();
206+
207+
await promise;
208+
finishOrder.push(item);
209+
return item;
210+
}, { concurrency: 2, pool: true });
211+
212+
(async () => {
213+
await stream.toArray();
214+
215+
assert.deepStrictEqual(finishOrder, [0, 1, 2, 3]);
216+
})().then(common.mustCall(), common.mustNotCall());
217+
}
218+
219+
{
220+
// Pool-based concurrency with a lot of items and large concurrency
221+
const finishOrder = [];
222+
223+
const promises = createDependentPromises(20);
224+
225+
const raw = Readable.from([11, 1, 0, 3, 4, 2, 5, 7, 8, 9, 6, 10, 12, 13, 18, 15, 16, 17, 14, 19]);
226+
// Should be
227+
// 11, 1, 0, 3, 4 | next: 0
228+
// 11, 1, 3, 4, 2 | next: 1
229+
// 11, 3, 4, 2, 5 | next: 2
230+
// 11, 3, 4, 5, 7 | next: 3
231+
// 11, 4, 5, 7, 8 | next: 4
232+
// 11, 5, 7, 8, 9 | next: 5
233+
// 11, 7, 8, 9, 6 | next: 6
234+
// 11, 7, 8, 9, 10 | next: 7
235+
// 11, 8, 9, 10, 12 | next: 8
236+
// 11, 9, 10, 12, 13 | next: 9
237+
// 11, 10, 12, 13, 18 | next: 10
238+
// 11, 12, 13, 18, 15 | next: 11
239+
// 12, 13, 18, 15, 16 | next: 12
240+
// 13, 18, 15, 16, 17 | next: 13
241+
// 18, 15, 16, 17, 14 | next: 14
242+
// 18, 15, 16, 17, 19 | next: 15
243+
// 18, 16, 17, 19 | next: 16
244+
// 18, 17, 19 | next: 17
245+
// 18, 19 | next: 18
246+
// 19 | next: 19
247+
//
248+
249+
const stream = raw.map(async (item) => {
250+
const [promise, resolve] = promises[item];
251+
resolve();
252+
253+
await promise;
254+
finishOrder.push(item);
255+
return item;
256+
}, { concurrency: 5, pool: true });
257+
258+
(async () => {
259+
await stream.toArray();
260+
261+
assert.deepStrictEqual(finishOrder, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]);
262+
})().then(common.mustCall(), common.mustNotCall());
263+
}
264+
265+
{
266+
// Pool-based concurrency where there is a delay between the first and the next item in the pool
267+
const finishOrder = [];
268+
269+
const raw = Readable.from((async function *() {
270+
yield 200;
271+
272+
// Making sure the first item (200) finish before the next item (0) starts
273+
// this is to make sure we don't wait for the pool to be filled before starting
274+
await setTimeout(500);
275+
276+
yield 0;
277+
yield 1;
278+
})());
279+
280+
let start;
281+
let delaysBetweenFirstAndNextPoolItem;
282+
283+
const stream = raw
284+
.map(async (item) => {
285+
await setTimeout(item);
286+
finishOrder.push(item);
287+
288+
return item;
289+
}, { concurrency: 2, pool: true })
290+
.map((item) => {
291+
if (item === 200) {
292+
start = Date.now();
293+
}
294+
295+
if (item === 0) {
296+
delaysBetweenFirstAndNextPoolItem = Date.now() - start;
297+
}
298+
299+
return item;
300+
});
301+
302+
(async () => {
303+
await stream.toArray();
304+
305+
assert.deepStrictEqual(finishOrder, [200, 0, 1]);
306+
// More than 250ms because the first item waits for 200ms and the next item have delay of 500ms
307+
assert.ok(delaysBetweenFirstAndNextPoolItem > 250, new Error(`delay between first and next item in the pool should be more than 250ms but instead got ${delaysBetweenFirstAndNextPoolItem}`));
308+
})().then(common.mustCall(), common.mustNotCall());
309+
}
310+
176311
{
177312
// Error cases
178313
assert.throws(() => Readable.from([1]).map(1), /ERR_INVALID_ARG_TYPE/);
179314
assert.throws(() => Readable.from([1]).map((x) => x, {
180315
concurrency: 'Foo'
181316
}), /ERR_OUT_OF_RANGE/);
317+
assert.throws(() => Readable.from([1]).map((x) => x, {
318+
concurrency: 1,
319+
pool: true
320+
}), /ERR_OUT_OF_RANGE/);
182321
assert.throws(() => Readable.from([1]).map((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
183322
assert.throws(() => Readable.from([1]).map((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/);
184323
}
324+
185325
{
186326
// Test result is a Readable
187327
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x);

0 commit comments

Comments
 (0)