-
Notifications
You must be signed in to change notification settings - Fork 29.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stream: fix flatMap
concurrency
#52816
base: main
Are you sure you want to change the base?
Conversation
Review requested:
|
Co-Authored-By: Benjamin Gruenbaum <benjamingr@gmail.com>
255e65c
to
948b245
Compare
Did you check if the stage 2 proposal requires one way or another? I think it's quite important that our implementation matches the proposal. |
When prototyping together we also explored two other ways to schedule the async iterators and this was nicest. My main concern is this complicating the regular flow for a single operator but I think it’s probably worth it as a bug fix and for correctness |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are many problems here. I think we need a different approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe start with making some better tests?
the section about concurrency doesn't mention much about implementation
can you please elaborate? are your concerns regarding the approach or the implementation?
I have to think a little what tests we can add, do you have any suggestion? |
Also round robin is probably wrong since it won't enforce the order. We would need to add some other operator which does not enfore order. |
Order is not enforced anyway with concurrency/flatMap? - round robin had the best fairness out of the attempts we've had (prioritize stream and only then read iterator or read whole iterator (which we do today). This isn't the implementation we started with, I think it can be improved significantly but there is no way around polling the inner flatMapped iterables and the input stream interchangeably? (though it may make sense to enforce the order in flatMap with concurrency=1) |
Iterators are strictly pull-based, and the default (possibly only) behavior when pulling twice from If I understand correctly, that matches the current behavior of stream's Note that there are a few different things you might mean by "flattening" in the context of (Feel free to ping me for any questions about async iterator helpers. The proposal is decidedly not finished, so what's written in the repo is not going to be very helpful.) |
38766cf
to
948b245
Compare
Order is enforced. Not sure why you think otherwise? |
What is the "logical" order of flatMap with concurrency? |
The only non-streaming alternative with concurrency is we buffer the intermediate iterators? |
Items exit in the same order they arrive. |
@ronag yes but items themselves are iterables of multiple items. If we want them to come in the same sub order we need to buffer all the sub iterations (without making them available to the consumer) while the user is waiting for the first iterable to complete. What behavior would you expect (unordered or ordered but buffers a lot)? Do you agree that the current behvior is kind of useless (w.r.t concurrency) when yielding async iterables? (the example in #52796 ) |
It is useless but correct IMHO |
It flattens everything and applies concurrency on the flat stream of entries. |
You can do an inner flat map on each stream or buffering to achieve concurrency. |
that is not the current behavior. it currently applies concurrency on getting/creating an iterator from the mapping function wich is synchronous in the case of async iterators |
@ronag so you would prefer we buffer the inner async iterables and yield them "in order" from flatMap? In terms of scheduling the sub-tasks do you have a better idea than round robin? |
Not round robin. Depth first. |
Fixes: #52796
this PR causes
flatMap
to iterate over the stream and mapped iterators in round-robin. I am not sure if this is the best approach but also not sure if there is a better one.I am also not exactly sure how to test this
I'd love to hear opinions from @nodejs/streams folks.