Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify the BufferedAsyncIterator #671

Merged

Conversation

ankon
Copy link
Contributor

@ankon ankon commented Mar 19, 2020

The goal of that iterator is to take a number of promises in unknown states, and generate
their results in the order of these promises resolving/rejecting.

This commit changes the implementation in such a way that we do not need an event emitter and
a result queue and a (fairly complex) logic of synchronizing these. Instead the iterator
builds two parallel queues, one containing unresolved promises and one containing the [resolve, reject]
functions for these promises. The incoming promises then are configured to pop the next [resolve, reject]
pair, and call that with the respective result. The iterator at the same time picks the next unresolved
promise from its own queue, and awaits the result.

On the runner side this requires to await the iterator.next(), which actually simplifies more code
by removing the need for a separate queue of "enqueued tasks".

Note that this code also removes the configurable "handleError" logic without replacement: No caller
ever provided that, so the default of "throw the error further" is good enough.

@ankon
Copy link
Contributor Author

ankon commented Mar 19, 2020

This works in my local tests fine, but given that I have issues getting the KafkaJS tests to run reliably I hope the Azure checks will show that the unit/integration tests also pass.

NOTE: This is currently on top of master, i.e. the bug fixed with #670 is still here

@ankon ankon force-pushed the pr/fetch-buffered-async-iterator-simplify branch from 8184c1f to 3986628 Compare March 19, 2020 10:22
if (numberOfExecutions === expectedNumberOfExecutions) {
unlock()
}
batches.forEach(batch =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note here: I changed batches.map to batches.forEach, because map implies that the result is used, but here the side-effects are what count and the result is ignored.

@Nevon Nevon requested a review from tulios March 19, 2020 16:10
@ankon ankon force-pushed the pr/fetch-buffered-async-iterator-simplify branch from 3986628 to eb0a771 Compare April 2, 2020 17:19
@ankon
Copy link
Contributor Author

ankon commented Apr 2, 2020

Rebased onto master to resolve the conflict introduced with #670 .

I think it would be good to pull this in before 1.13 if possible, as it simplifies code introduced in 1.13 -- so less variation in the releases :)

await this.consumerGroup.heartbeat({ interval: this.heartbeatInterval })
} catch (e) {
unlockWithError(e)
} finally {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was just comparing with my PR #705 :) TBH i i think finally is not needed here as if we fail it will call unlockWithError and even if we later call unlock nothing will happened as promise has been rejected.

let expectedNumberOfExecutions = 0
let numberOfExecutions = 0
const { lock, unlock, unlockWithError } = barrier()

while (true) {
const result = iterator.next()
const result = await iterator.next()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was abit concerned about awaiting in the while loop as it is not going to wait for this and will just loop and saturate cpu... (this is just assumption have not validate that...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The await-ing here is needed because the generator is now doing funny async stuff as well. It shouldn't be a busy loop though, but rather it awaits here for the next promise in the bufferedasynciterator to resolve.

@goriunov
Copy link
Contributor

Probably would be good idea to join some of my changes from #705 with yours ! 😄

@ankon
Copy link
Contributor Author

ankon commented Apr 21, 2020

Probably would be good idea to join some of my changes from #705 with yours ! 😄

Indeed, that would be my plan as well. Given that #705 fixes an actual problem, I'd think the best approach here could be to wait for #705 to land, and then to rebase this one. WDYT?

@ankon ankon mentioned this pull request May 29, 2020
ankon added a commit to Collaborne/kafkajs that referenced this pull request May 29, 2020
…e fix for the unhandled promise rejections

The earlier merge unfortunately "restored" the problem that was initially fixed with tulios#671, by introducing another
`await lock` call in a situation where there wouldn't be any unlocking.
@ankon
Copy link
Contributor Author

ankon commented May 30, 2020

Indeed, that would be my plan as well. Given that #705 fixes an actual problem, I'd think the best approach here could be to wait for #705 to land, and then to rebase this one. WDYT?

Small update: #705 is replaced by #714 of course, and it seems that merging these together won't be much of an issue. We're now (temporarily) using our own fork of KafkaJS with both changes applied, as the changes in the beta are too tempting to not use and they do fix at least one issue for us. Our fork is at https://github.com/Collaborne/kafkajs/tree/collaborne, and can be used for reference (but obviously shouldn't be used in production by anyone unless you know what you're doing -- it's a prerelease, after all!)

@goriunov
Copy link
Contributor

@ankon just went through this change again and noticed that after your change requests are not processed asynchronously any more as it will wait for every single request to be processed in order, before they were processed asynchronously in Promise.all().

@ankon
Copy link
Contributor Author

ankon commented Jun 1, 2020

Hmm.

I'll have a look, that's certainly not the idea here. Good hint to maybe write a test showing that it indeed processes things asynchronously. :)

@ankon
Copy link
Contributor Author

ankon commented Jun 2, 2020

While looking further here I find the existing code harder and harder to reason about.

  1. There is probably a way to reject a promise twice, because the event emitter's error listener isn't removed (while the data listener is). This is likely benign, as per http://www.ecma-international.org/ecma-262/6.0/#sec-promise-reject-functions, but confusing when reading.
  2. I'm thinking that there is also an opportunity for a race where an error might get eaten. This probably requires very careful timing and some asynchronicity in processing batches to trigger, but assuming that there are results in the queue and the next promise is quick enough to error it seems that there won't be an error handler installed for the error -- except for the ones left over from the previous problem, of course. :)
  3. While it is true that it looks "more concurrent" (less "await"-y), it's difficult to see exactly where stuff is getting blocked on which conditions. At the end we have the async iterator doing things, the consumer group/runner doing things, and of course the concurrency-limiting also doing its thing.

That might all be me missing something while wrapping my head around this again of course.

On the good news side: There is already tests for the concurrency behavior of the iterator, and I definitely missed to adapt these -- so I'll see whether I can extend these further as well.

@ankon ankon force-pushed the pr/fetch-buffered-async-iterator-simplify branch from eb0a771 to 6b38635 Compare June 2, 2020 16:35
@ankon ankon changed the title Simplify the BufferedAsyncIterator and runner fetch driver Simplify the BufferedAsyncIterator Jun 2, 2020
@ankon
Copy link
Contributor Author

ankon commented Jun 2, 2020

I went over the changes now and rebased them onto master.

I also decided to remove the change from a generator to an async generator for now, because as @goriunov pointed out there is definitely a change in how/where the asynchronicity happens. This makes this change simpler, and more obviously a refactoring: The interface of BufferedAsyncIterator stays the same, the tests stay the same (and still pass :D), but the whole thing lost half of its lines and hopefully also got a lot more easy to process as a human.

This PR now has two smaller JSDoc improvements to sprinkle some type information which helps when reading this code as well, tell me if I better create separate PRs for these.

@ankon ankon force-pushed the pr/fetch-buffered-async-iterator-simplify branch from 6b38635 to 3152ca9 Compare June 2, 2020 21:03
@t-d-d
Copy link
Contributor

t-d-d commented Jun 14, 2020

+1. I don't understand the original code but do understand this.

@Nevon
Copy link
Collaborator

Nevon commented Jul 20, 2020

I wasn't quite sure about the implications of this, since the code is quite complex and it's easy to make a change that accidentally changes the asynchronous behavior, so I wrote a test that uses trace_events to create a timeline for how the promises behave. The test setup itself was simply:

const BufferedAsyncIterator = require(`./${process.argv[2]}.js`)
const tracing = trace_events.createTracing({ categories: ['node.async_hooks'] })
tracing.enable();

const promises = [sleep(300).then(() => 1), sleep(100).then(() => 2), sleep(500).then(() => 3)]
const iterator = BufferedAsyncIterator(promises)

while (true) {
    const result = iterator.next()
    if (result.done) {
        break
    }

    await result.value
}
tracing.disable()

And then I ran it with both implementations and compared the results in Chrome's trace view. Left is the original implementation and right is the simplified one.

Screenshot 2020-07-20 at 20 04 42

As you can see, the behavior is identical between the different implementations.

The traces are available here:
original-50845-1.log
simplified-50858-1.log

The goal of that iterator is to take a number of promises in unknown states, and generate
their results in the order of these promises resolving/rejecting.

This commit changes the implementation in such a way that we do not need an event emitter *and*
a result queue and a (fairly complex) logic of synchronizing these. Instead the iterator
builds two parallel queues, one containing unresolved promises and one containing the resolve and reject
functions for these promises. The incoming promises then are configured to pop the next {resolve, reject}
structure, and call that with the respective result. The iterator at the same time picks the next unresolved
promise from its own queue and yields that.
@ankon ankon force-pushed the pr/fetch-buffered-async-iterator-simplify branch from 3152ca9 to 114209a Compare July 21, 2020 08:13
@ankon
Copy link
Contributor Author

ankon commented Jul 21, 2020

(Rebased onto current master)

@Nevon
Copy link
Collaborator

Nevon commented Jul 21, 2020

Did some more checks to see that error recovery was working the same way, and everything seems good. I also find this implementation a lot easier to understand, so I'm gonna go ahead and merge now that I'm confident that it's not changing the behavior. Thanks for the refactor!

@Nevon Nevon merged commit e1581e5 into tulios:master Jul 21, 2020
@ankon ankon deleted the pr/fetch-buffered-async-iterator-simplify branch July 21, 2020 15:36
@Nevon Nevon mentioned this pull request Feb 11, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants