Open
Description
Version
v22.0.0
Platform
Darwin Moshes-MBP.localdomain 23.3.0 Darwin Kernel Version 23.3.0: Wed Dec 20 21:30:44 PST 2023; root:xnu-10002.81.5~7/RELEASE_ARM64_T6000 arm64
Subsystem
stream
What steps will reproduce the bug?
Run this code:
const { Readable } = require('node:stream');
const { setTimeout } = require('node:timers/promises');
const start = Date.now();
const base = Readable.from([1, 2, 3, 4, 5], { highWaterMark: 100 });
const mapped = base.flatMap(async function * (x) {
await setTimeout(1000);
yield x * 2;
}, { concurrency: 200 });
(async () => {
for await (const chunk of mapped) {
console.log({ chunk }, Math.floor((Date.now() - start) / 1000));
}
})();
How often does it reproduce? Is there a required condition?
allways
What is the expected behavior? Why is that the expected behavior?
since concurrency
is set to 200 - I expect all the chunks to be emitted together, however there is a second between each chuck:
{ chunk: 2 } 1
{ chunk: 4 } 2
{ chunk: 6 } 3
{ chunk: 8 } 4
{ chunk: 10 } 5
What do you see instead?
{ chunk: 2 } 1
{ chunk: 4 } 2
{ chunk: 6 } 3
{ chunk: 8 } 4
{ chunk: 10 } 5
Additional information
when running similar code using Readable.map
the concurrency works as expected:
const { Readable } = require('node:stream');
const { setTimeout } = require('node:timers/promises');
const start = Date.now();
const base = Readable.from([1, 2, 3, 4, 5], { highWaterMark: 100 });
const mapped = base.map(async function(x) {
await setTimeout(1000);
return x * 2;
}, { concurrency: 200 });
(async () => {
for await (const chunk of mapped) {
console.log({ chunk }, Math.floor((Date.now() - start) / 1000));
}
})();
results in
{ chunk: 2 } 1
{ chunk: 4 } 1
{ chunk: 6 } 1
{ chunk: 8 } 1
{ chunk: 10 } 1