Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Readable.map concurrency not running map on next item before previous finish #46132

Closed
rluvaton opened this issue Jan 8, 2023 · 3 comments · Fixed by #49249
Closed

Readable.map concurrency not running map on next item before previous finish #46132

rluvaton opened this issue Jan 8, 2023 · 3 comments · Fixed by #49249

Comments

@rluvaton
Copy link
Member

rluvaton commented Jan 8, 2023

Version

v18.12.1

Platform

Darwin Razs-MBP 22.2.0 Darwin Kernel Version 22.2.0: Fri Nov 11 02:03:51 PST 2022; root:xnu-8792.61.2~4/RELEASE_ARM64_T6000 arm64

Subsystem

stream

What steps will reproduce the bug?

{
  const finishOrder = [];

  const stream = Readable.from([10, 1, 5, 20]).map(async (item, { signal }) => {
    await setTimeout(item, { signal });
    finishOrder.push(item);
    return item;
  }, { concurrency: 2 });

  (async () => {
    await stream.toArray()

    assert.deepStrictEqual(finishOrder, [1, 5, 10, 20]);
  })().then(common.mustCall());
}
Same idea but with pretty output
const { Readable } = require('node:stream');
const { setTimeout } = require('node:timers/promises')

async function run() {
  const all = [];
  const start = Date.now();

  const data = Readable.from([
    [1, 200],
    [2, 500],
    [3, 100],
    [4, 300],
    [5, 600],
    [6, 60],
  ])
    .map(async ([index, timeout]) => {
      const waitedTime = Date.now() - start;

      await setTimeout(timeout)

      return {
        index,
        waitedTime,
        timeTook: timeout,
      };
    }, { concurrency: 3 });


  for await (const item of data) {
    all.push(item);
  }

  printPretty(all)
}

run();


function printPretty(all) {
  for (const item of all) {
    const title = ` ${item.index} `;

    // Reduce the number of dots needed
    const waitedTime = Math.floor(item.waitedTime / 20);
    const timeTook = Math.floor(item.timeTook / 20);

    const line = [
      // If not started late than don't add spaces for padding
      waitedTime === 0 ? '' : ' '.repeat(title.length),

      '░'.repeat(waitedTime),

      title,

      // Running time
      '.'.repeat(timeTook),
    ].join('');

    console.log(line);
  }
}

How often does it reproduce? Is there a required condition?

always

What is the expected behavior?

that as soon as any item finishes it will start the next one until reaching the highWaterMark (maybe we should add it as an option?)

the docs say:

concurrency <number> the maximum concurrent invocation of fn to call on the stream at once. Default: 1.

From Docs - map concurrency param

for the nodejs internal test the test should pass

For the Code with pretty output, the output should look like this:

Symbol meanings:
. (dot) is running
is waiting

 1 ..........
 2 .........................
 3 .....
   ░░░░░ 4 ...............
   ░░░░░░░░░░ 5 ..............................
   ░░░░░░░░░░░░░░░░░░░░░░░ 6 ...

(as soon as one finished another one start)

What do you see instead?

For the nodejs internal test

node:internal/process/promises:289
            triggerUncaughtException(err, true /* fromPromise */);
            ^

AssertionError [ERR_ASSERTION]: Expected values to be strictly deep-equal:
+ actual - expected

  [
    1,
+   10,
+   5,
-   5,
-   10,
    20
  ]
    at ~/dev/open-source/node/node-fork/test/parallel/test-stream-map.js:189:12
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
  generatedMessage: true,
  code: 'ERR_ASSERTION',
  actual: [ 1, 10, 5, 20 ],
  expected: [ 1, 5, 10, 20 ],
  operator: 'deepStrictEqual'
}
For the Code with pretty output

function not running even though they can

Symbol meanings:
. (dot) is running
is waiting


 1 ..........
 2 .........................
 3 .....
   ░░░░░░░░░░ 4 ...............
   ░░░░░░░░░░░░░░░░░░░░░░░░░ 5 ..............................
   ░░░░░░░░░░░░░░░░░░░░░░░░░ 6 ...

Additional information

No response

@rluvaton
Copy link
Member Author

rluvaton commented Jan 8, 2023

Maybe it's not a bug as we don't state the minimum number that the map function will run concurrently.

@benjamingr
Copy link
Member

This was intentional though it can be discussed/changed

@benjamingr
Copy link
Member

maybe we should add it as an option?

If you look at the original PR - it had highWaterMark, it was removed only for simplicity and can be re-added

rluvaton added a commit to rluvaton/node that referenced this issue Aug 20, 2023
rluvaton added a commit to rluvaton/node that referenced this issue Aug 20, 2023
rluvaton added a commit to rluvaton/node that referenced this issue Aug 23, 2023
this is done so we don't wait for the first items to
finish before starting new ones

Fixes: nodejs#46132

Co-authored-by: Robert Nagy <ronagy@icloud.com>
nodejs-github-bot pushed a commit that referenced this issue Aug 24, 2023
this is done so we don't wait for the first items to
finish before starting new ones

Fixes: #46132

Co-authored-by: Robert Nagy <ronagy@icloud.com>
PR-URL: #49249
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
UlisesGascon pushed a commit that referenced this issue Sep 10, 2023
this is done so we don't wait for the first items to
finish before starting new ones

Fixes: #46132

Co-authored-by: Robert Nagy <ronagy@icloud.com>
PR-URL: #49249
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
alexfernandez pushed a commit to alexfernandez/node that referenced this issue Nov 1, 2023
this is done so we don't wait for the first items to
finish before starting new ones

Fixes: nodejs#46132

Co-authored-by: Robert Nagy <ronagy@icloud.com>
PR-URL: nodejs#49249
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
targos pushed a commit that referenced this issue Nov 27, 2023
this is done so we don't wait for the first items to
finish before starting new ones

Fixes: #46132

Co-authored-by: Robert Nagy <ronagy@icloud.com>
PR-URL: #49249
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
sercher added a commit to sercher/graaljs that referenced this issue Apr 25, 2024
this is done so we don't wait for the first items to
finish before starting new ones

Fixes: nodejs/node#46132

Co-authored-by: Robert Nagy <ronagy@icloud.com>
PR-URL: nodejs/node#49249
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
sercher added a commit to sercher/graaljs that referenced this issue Apr 25, 2024
this is done so we don't wait for the first items to
finish before starting new ones

Fixes: nodejs/node#46132

Co-authored-by: Robert Nagy <ronagy@icloud.com>
PR-URL: nodejs/node#49249
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants