Skip to content

Commit 255e65c

Browse files
MoLowbenjamingr
andcommitted
stream: fix flatMap concurrency
Co-Authored-By: Benjamin Gruenbaum <benjamingr@gmail.com>
1 parent b876e00 commit 255e65c

File tree

2 files changed

+57
-38
lines changed

2 files changed

+57
-38
lines changed

lib/internal/streams/operators.js

Lines changed: 55 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ const {
1010
PromisePrototypeThen,
1111
PromiseReject,
1212
PromiseResolve,
13+
SafeSet,
1314
Symbol,
15+
SymbolAsyncIterator,
16+
SymbolIterator,
1417
} = primordials;
1518

1619
const { AbortController, AbortSignal } = require('internal/abort_controller');
@@ -39,6 +42,7 @@ const { isWritable, isNodeStream } = require('internal/streams/utils');
3942

4043
const kEmpty = Symbol('kEmpty');
4144
const kEof = Symbol('kEof');
45+
const kFlatMap = Symbol('kFlatMap');
4246

4347
function compose(stream, options) {
4448
if (options != null) {
@@ -92,11 +96,19 @@ function map(fn, options) {
9296

9397
highWaterMark += concurrency;
9498

99+
const flatMap = options?.[kFlatMap] != null;
100+
95101
return async function* map() {
96102
const signal = AbortSignal.any([options?.signal].filter(Boolean));
97103
const stream = this;
98104
const queue = [];
99105
const signalOpt = { signal };
106+
const baseIterator = (async function* baseIterator() {
107+
for await (const value of stream) {
108+
yield { result: fn(value, signalOpt) };
109+
}
110+
})();
111+
const iterators = new SafeSet([baseIterator]);
100112

101113
let next;
102114
let resume;
@@ -125,45 +137,54 @@ function map(fn, options) {
125137
}
126138
}
127139

140+
function addIterator(result) {
141+
if (result && (result[SymbolAsyncIterator] || result[SymbolIterator])) {
142+
const iterator = result[SymbolAsyncIterator] ? result[SymbolAsyncIterator]() : result[SymbolIterator]();
143+
iterators.add(iterator);
144+
return kEmpty;
145+
}
146+
return result;
147+
}
148+
128149
async function pump() {
129150
try {
130-
for await (let val of stream) {
131-
if (done) {
132-
return;
133-
}
134-
135-
if (signal.aborted) {
136-
throw new AbortError();
137-
}
138-
139-
try {
140-
val = fn(val, signalOpt);
141-
142-
if (val === kEmpty) {
143-
continue;
151+
while (iterators.size > 0) {
152+
for (const iterator of iterators) {
153+
if (done) {
154+
return;
144155
}
145156

146-
val = PromiseResolve(val);
147-
} catch (err) {
148-
val = PromiseReject(err);
149-
}
150-
151-
cnt += 1;
157+
if (signal.aborted) {
158+
throw new AbortError();
159+
}
160+
let val = PromisePrototypeThen(PromiseResolve(iterator.next()), ({ value, done }) => {
161+
if (done) {
162+
iterators.delete(iterator);
163+
return kEmpty;
164+
}
165+
return iterator === baseIterator ? value.result : value;
166+
});
152167

153-
PromisePrototypeThen(val, afterItemProcessed, onCatch);
168+
if (flatMap && baseIterator === iterator) {
169+
val = PromisePrototypeThen(val, addIterator);
170+
}
171+
PromisePrototypeThen(val, afterItemProcessed, onCatch);
172+
cnt += 1;
173+
queue.push(val);
154174

155-
queue.push(val);
156-
if (next) {
157-
next();
158-
next = null;
159-
}
175+
if (next) {
176+
next();
177+
next = null;
178+
}
160179

161-
if (!done && (queue.length >= highWaterMark || cnt >= concurrency)) {
162-
await new Promise((resolve) => {
163-
resume = resolve;
164-
});
180+
if (!done && (queue.length >= highWaterMark || cnt >= concurrency)) {
181+
await new Promise((resolve) => {
182+
resume = resolve;
183+
});
184+
}
165185
}
166186
}
187+
167188
queue.push(kEof);
168189
} catch (err) {
169190
const val = PromiseReject(err);
@@ -343,12 +364,10 @@ async function toArray(options) {
343364
}
344365

345366
function flatMap(fn, options) {
346-
const values = map.call(this, fn, options);
347-
return async function* flatMap() {
348-
for await (const val of values) {
349-
yield* val;
350-
}
351-
}.call(this);
367+
if (options != null) {
368+
validateObject(options, 'options');
369+
}
370+
return map.call(this, fn, { ...options, [kFlatMap]: true });
352371
}
353372

354373
function toIntegerOrInfinity(number) {

test/parallel/test-stream-flatMap.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,9 @@ function oneTo5() {
7373
{
7474
// Concurrency + AbortSignal
7575
const ac = new AbortController();
76-
const stream = oneTo5().flatMap(common.mustNotCall(async (_, { signal }) => {
76+
const stream = oneTo5().flatMap(common.mustCall(async (_, { signal }) => {
7777
await setTimeout(100, { signal });
78-
}), { signal: ac.signal, concurrency: 2 });
78+
}, 2), { signal: ac.signal, concurrency: 2 });
7979
// pump
8080
assert.rejects(async () => {
8181
for await (const item of stream) {

0 commit comments

Comments
 (0)