-
Notifications
You must be signed in to change notification settings - Fork 29.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stream: add highWaterMark
to the map
and filter
operator
#49249
Conversation
Review requested:
|
942dc4b
to
984dc74
Compare
I'm not sure I understand what this is supposed to do? |
Currently the map operator is using queue based strategy which means the order matter, sometimes you have a situation where you don't care about the order of the items, so you want pool based strategy which means as soon as one finishes another one starts You can have more information in the attached issue. If you can be more specific I would appreciate it 😀 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really understand why this is called pool and what it is trying to achieve exactly.
lib/internal/streams/operators.js
Outdated
@@ -122,6 +151,9 @@ function map(fn, options) { | |||
val.catch(onDone); | |||
} | |||
|
|||
// This is needed for the pool strategy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry Raz I don't understand it either,
Let's take for example this situation, we have 6 items and use concurrency of 3: currently, the execution would look like that:
Notice that 4 does not start when the earliest of 1, 2 and 3 (which is 3 in this case) finish but only after 1 finish the problem is that what if we don't care about the order, and we just wanna do 3 items in parallel in every single moment this is what pool means as opposed to queue which imply the order is kept with this pr, when passing
as soon as something is finished another one starts (in this case 3 finished so 4 start immediately) If you still have questions let me know |
I don't understand why we wait at all for 1 to finish before starting 4, it seems like we should just change that instead of adding an option? |
I agree with @benjamingr, I don't see a reason for the old behavior. |
if we don't keep the old behavior but we modify the new one to push items in the order they came it can cause memory leak when not consuming first items and changing the behavior to not emit items in the order they can is a breaking change that I don't think we should do as it's valuable |
How? What sort of leak?
The API is experimental and my expectation around Readable.from([5000, 1, 1, 1, 1, 5000]).map(async x => {
await setTimeout(x);
}, { concurrency: 4 }).toArray(); But looking at the code my guess is that it does |
sort of leak is that if we don't consume the first item and we just consume all the rest we gonna accumulate data
{
Readable.from([5000, 1, 1, 1, 1, 5000]).map(async x => {
await setTimeout(x);
return x;
}, { concurrency: 4 }).toArray().then(val => console.log('queue', val));
}
{
Readable.from([5000, 1, 1, 1, 1, 5000]).map(async x => {
await setTimeout(x);
return x;
}, { concurrency: 4, pool: true }).toArray().then(val => console.log('pool', val));
} The output is:
so the output is not the same... |
Right, here is an alternative: which tells the stream how much to buffer, we re-add We keep the output order as long as we can depending on |
but what if we don't care about the order? |
Buffering more is fine, changing what we wait for when we consume is fine but we shouldn't change the actual output of |
Changing order of |
What reasonable |
When you say adding |
We call it |
I implemented (Need to update the tests but wanna know if that's what you meant) |
The ones that are not causing failures for the last few bug fix suggestions I made. |
all the bug fixes are already covered from what I checked |
Commit Queue failed- Loading data for nodejs/node/pull/49249 ✔ Done loading data for nodejs/node/pull/49249 ----------------------------------- PR info ------------------------------------ Title stream: add `highWaterMark` to the `map` and `filter` operator (#49249) ⚠ Could not retrieve the email or name of the PR author's from user's GitHub profile! Branch rluvaton:add-stream-map-pool -> nodejs:main Labels stream, needs-ci Commits 10 - stream: add `highWaterMark` for the map operator - stream: update test to check output order - stream: update and rename - stream: filter empty in the right place - stream: remove unnecessary code - stream: allow user to pass `highWaterMark` to `map` and `filter` fns - stream: update docs after adding `highWaterMark` - stream: fix highWaterMark validation and fix test - stream: updated CR - stream: fix concurrency and test Committers 1 - Raz Luvaton <16746759+rluvaton@users.noreply.github.com> PR-URL: https://github.com/nodejs/node/pull/49249 Fixes: https://github.com/nodejs/node/issues/46132 Reviewed-By: Matteo Collina Reviewed-By: Benjamin Gruenbaum Reviewed-By: Robert Nagy ------------------------------ Generated metadata ------------------------------ PR-URL: https://github.com/nodejs/node/pull/49249 Fixes: https://github.com/nodejs/node/issues/46132 Reviewed-By: Matteo Collina Reviewed-By: Benjamin Gruenbaum Reviewed-By: Robert Nagy -------------------------------------------------------------------------------- ℹ This PR was created on Sat, 19 Aug 2023 23:11:27 GMT ✔ Approvals: 3 ✔ - Matteo Collina (@mcollina) (TSC): https://github.com/nodejs/node/pull/49249#pullrequestreview-1592477937 ✔ - Benjamin Gruenbaum (@benjamingr) (TSC): https://github.com/nodejs/node/pull/49249#pullrequestreview-1593092273 ✔ - Robert Nagy (@ronag) (TSC): https://github.com/nodejs/node/pull/49249#pullrequestreview-1593077530 ✔ Last GitHub CI successful ℹ Last Full PR CI on 2023-08-24T09:20:44Z: https://ci.nodejs.org/job/node-test-pull-request/53539/ - Querying data for job/node-test-pull-request/53539/ ✔ Last Jenkins CI successful -------------------------------------------------------------------------------- ✔ No git cherry-pick in progress ✔ No git am in progress ✔ No git rebase in progress -------------------------------------------------------------------------------- - Bringing origin/main up to date... From https://github.com/nodejs/node * branch main -> FETCH_HEAD ✔ origin/main is now up-to-date - Downloading patch for 49249 From https://github.com/nodejs/node * branch refs/pull/49249/merge -> FETCH_HEAD ✔ Fetched commits as da197d189021..4b0707997951 -------------------------------------------------------------------------------- [main 0034c0e5f8] stream: add `highWaterMark` for the map operator Author: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sun Aug 20 01:42:56 2023 +0300 2 files changed, 159 insertions(+), 17 deletions(-) [main d02a503624] stream: update test to check output order Author: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Wed Aug 23 18:14:20 2023 +0300 1 file changed, 6 insertions(+), 3 deletions(-) [main d41d4506f7] stream: update and rename Author: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Wed Aug 23 18:16:36 2023 +0300 1 file changed, 8 insertions(+), 12 deletions(-) [main 281e41769b] stream: filter empty in the right place Author: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Wed Aug 23 21:46:58 2023 +0300 1 file changed, 5 insertions(+), 4 deletions(-) [main bd13b8d45a] stream: remove unnecessary code Author: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu Aug 24 00:16:31 2023 +0300 1 file changed, 2 insertions(+), 3 deletions(-) [main 1e131966c9] stream: allow user to pass `highWaterMark` to `map` and `filter` fns Author: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu Aug 24 00:31:55 2023 +0300 2 files changed, 60 insertions(+), 5 deletions(-) Auto-merging doc/api/stream.md [main 0b8bc71a8c] stream: update docs after adding `highWaterMark` Author: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu Aug 24 00:43:55 2023 +0300 1 file changed, 12 insertions(+) [main d2d8025628] stream: fix highWaterMark validation and fix test Author: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu Aug 24 10:38:54 2023 +0300 3 files changed, 6 insertions(+), 3 deletions(-) [main d78a1158ee] stream: updated CR Author: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu Aug 24 10:57:31 2023 +0300 1 file changed, 4 insertions(+), 4 deletions(-) [main 89215b6aa9] stream: fix concurrency and test Author: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu Aug 24 11:22:42 2023 +0300 2 files changed, 13 insertions(+), 14 deletions(-) ✔ Patches applied There are 10 commits in the PR. Attempting autorebase. Rebasing (2/20)https://github.com/nodejs/node/actions/runs/5963058918 |
Landed in b0f4233 |
this is done so we don't wait for the first items to finish before starting new ones Fixes: #46132 Co-authored-by: Robert Nagy <ronagy@icloud.com> PR-URL: #49249 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Robert Nagy <ronagy@icloud.com>
this is done so we don't wait for the first items to finish before starting new ones Fixes: nodejs#46132 Co-authored-by: Robert Nagy <ronagy@icloud.com> PR-URL: nodejs#49249 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Robert Nagy <ronagy@icloud.com>
this is done so we don't wait for the first items to finish before starting new ones Fixes: #46132 Co-authored-by: Robert Nagy <ronagy@icloud.com> PR-URL: #49249 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Robert Nagy <ronagy@icloud.com>
this is done so we don't wait for the first items to finish before starting new ones Fixes: nodejs/node#46132 Co-authored-by: Robert Nagy <ronagy@icloud.com> PR-URL: nodejs/node#49249 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Robert Nagy <ronagy@icloud.com>
this is done so we don't wait for the first items to finish before starting new ones Fixes: nodejs/node#46132 Co-authored-by: Robert Nagy <ronagy@icloud.com> PR-URL: nodejs/node#49249 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Robert Nagy <ronagy@icloud.com>
Fixes: #46132
Co-authored-by: Robert Nagy ronagy@icloud.com