Skip to content

Readable.flatMap concurrency isn't working as expected #52796

Open
@MoLow

Description

@MoLow

Version

v22.0.0

Platform

Darwin Moshes-MBP.localdomain 23.3.0 Darwin Kernel Version 23.3.0: Wed Dec 20 21:30:44 PST 2023; root:xnu-10002.81.5~7/RELEASE_ARM64_T6000 arm64

Subsystem

stream

What steps will reproduce the bug?

Run this code:

const { Readable } = require('node:stream');
const { setTimeout } = require('node:timers/promises');

const start = Date.now();
const base = Readable.from([1, 2, 3, 4, 5], { highWaterMark: 100 });
const mapped = base.flatMap(async function * (x) {
  await setTimeout(1000);
  yield x * 2;
}, { concurrency: 200 });

(async () => {
  for await (const chunk of mapped) {
    console.log({ chunk }, Math.floor((Date.now() - start) / 1000));
  }
})();

How often does it reproduce? Is there a required condition?

allways

What is the expected behavior? Why is that the expected behavior?

since concurrency is set to 200 - I expect all the chunks to be emitted together, however there is a second between each chuck:

{ chunk: 2 } 1
{ chunk: 4 } 2
{ chunk: 6 } 3
{ chunk: 8 } 4
{ chunk: 10 } 5

What do you see instead?

{ chunk: 2 } 1
{ chunk: 4 } 2
{ chunk: 6 } 3
{ chunk: 8 } 4
{ chunk: 10 } 5

Additional information

when running similar code using Readable.map the concurrency works as expected:

const { Readable } = require('node:stream');
const { setTimeout } = require('node:timers/promises');

const start = Date.now();
const base = Readable.from([1, 2, 3, 4, 5], { highWaterMark: 100 });
const mapped = base.map(async function(x) {
  await setTimeout(1000);
  return x * 2;
}, { concurrency: 200 });

(async () => {
  for await (const chunk of mapped) {
    console.log({ chunk }, Math.floor((Date.now() - start) / 1000));
  }
})();

results in

{ chunk: 2 } 1
{ chunk: 4 } 1
{ chunk: 6 } 1
{ chunk: 8 } 1
{ chunk: 10 } 1

Metadata

Metadata

Assignees

No one assigned

    Labels

    streamIssues and PRs related to the stream subsystem.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions