Skip to content

Readable.map concurrency not running map on next item before previous finishΒ #46132

Closed
@rluvaton

Description

@rluvaton

Version

v18.12.1

Platform

Darwin Razs-MBP 22.2.0 Darwin Kernel Version 22.2.0: Fri Nov 11 02:03:51 PST 2022; root:xnu-8792.61.2~4/RELEASE_ARM64_T6000 arm64

Subsystem

stream

What steps will reproduce the bug?

{
  const finishOrder = [];

  const stream = Readable.from([10, 1, 5, 20]).map(async (item, { signal }) => {
    await setTimeout(item, { signal });
    finishOrder.push(item);
    return item;
  }, { concurrency: 2 });

  (async () => {
    await stream.toArray()

    assert.deepStrictEqual(finishOrder, [1, 5, 10, 20]);
  })().then(common.mustCall());
}
Same idea but with pretty output
const { Readable } = require('node:stream');
const { setTimeout } = require('node:timers/promises')

async function run() {
  const all = [];
  const start = Date.now();

  const data = Readable.from([
    [1, 200],
    [2, 500],
    [3, 100],
    [4, 300],
    [5, 600],
    [6, 60],
  ])
    .map(async ([index, timeout]) => {
      const waitedTime = Date.now() - start;

      await setTimeout(timeout)

      return {
        index,
        waitedTime,
        timeTook: timeout,
      };
    }, { concurrency: 3 });


  for await (const item of data) {
    all.push(item);
  }

  printPretty(all)
}

run();


function printPretty(all) {
  for (const item of all) {
    const title = ` ${item.index} `;

    // Reduce the number of dots needed
    const waitedTime = Math.floor(item.waitedTime / 20);
    const timeTook = Math.floor(item.timeTook / 20);

    const line = [
      // If not started late than don't add spaces for padding
      waitedTime === 0 ? '' : ' '.repeat(title.length),

      'β–‘'.repeat(waitedTime),

      title,

      // Running time
      '.'.repeat(timeTook),
    ].join('');

    console.log(line);
  }
}

How often does it reproduce? Is there a required condition?

always

What is the expected behavior?

that as soon as any item finishes it will start the next one until reaching the highWaterMark (maybe we should add it as an option?)

the docs say:

concurrency <number> the maximum concurrent invocation of fn to call on the stream at once. Default: 1.

From Docs - map concurrency param

for the nodejs internal test the test should pass

For the Code with pretty output, the output should look like this:

Symbol meanings:
. (dot) is running
β–‘ is waiting

 1 ..........
 2 .........................
 3 .....
   β–‘β–‘β–‘β–‘β–‘ 4 ...............
   β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘ 5 ..............................
   β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘ 6 ...

(as soon as one finished another one start)

What do you see instead?

For the nodejs internal test

node:internal/process/promises:289
            triggerUncaughtException(err, true /* fromPromise */);
            ^

AssertionError [ERR_ASSERTION]: Expected values to be strictly deep-equal:
+ actual - expected

  [
    1,
+   10,
+   5,
-   5,
-   10,
    20
  ]
    at ~/dev/open-source/node/node-fork/test/parallel/test-stream-map.js:189:12
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
  generatedMessage: true,
  code: 'ERR_ASSERTION',
  actual: [ 1, 10, 5, 20 ],
  expected: [ 1, 5, 10, 20 ],
  operator: 'deepStrictEqual'
}
For the Code with pretty output

function not running even though they can

Symbol meanings:
. (dot) is running
β–‘ is waiting


 1 ..........
 2 .........................
 3 .....
   β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘ 4 ...............
   β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘ 5 ..............................
   β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘ 6 ...

Additional information

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions