-
-
Notifications
You must be signed in to change notification settings - Fork 525
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Simplify the BufferedAsyncIterator #671
Simplify the BufferedAsyncIterator #671
Conversation
This works in my local tests fine, but given that I have issues getting the KafkaJS tests to run reliably I hope the Azure checks will show that the unit/integration tests also pass. NOTE: This is currently on top of master, i.e. the bug fixed with #670 is still here |
8184c1f
to
3986628
Compare
src/consumer/runner.js
Outdated
if (numberOfExecutions === expectedNumberOfExecutions) { | ||
unlock() | ||
} | ||
batches.forEach(batch => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note here: I changed batches.map
to batches.forEach
, because map
implies that the result is used, but here the side-effects are what count and the result is ignored.
3986628
to
eb0a771
Compare
Rebased onto master to resolve the conflict introduced with #670 . I think it would be good to pull this in before 1.13 if possible, as it simplifies code introduced in 1.13 -- so less variation in the releases :) |
src/consumer/runner.js
Outdated
await this.consumerGroup.heartbeat({ interval: this.heartbeatInterval }) | ||
} catch (e) { | ||
unlockWithError(e) | ||
} finally { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was just comparing with my PR #705 :) TBH i i think finally
is not needed here as if we fail it will call unlockWithError
and even if we later call unlock
nothing will happened as promise has been rejected.
src/consumer/runner.js
Outdated
let expectedNumberOfExecutions = 0 | ||
let numberOfExecutions = 0 | ||
const { lock, unlock, unlockWithError } = barrier() | ||
|
||
while (true) { | ||
const result = iterator.next() | ||
const result = await iterator.next() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was abit concerned about awaiting in the while loop as it is not going to wait for this and will just loop and saturate cpu... (this is just assumption have not validate that...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The await-ing here is needed because the generator is now doing funny async stuff as well. It shouldn't be a busy loop though, but rather it awaits here for the next promise in the bufferedasynciterator to resolve.
Probably would be good idea to join some of my changes from #705 with yours ! 😄 |
…e fix for the unhandled promise rejections The earlier merge unfortunately "restored" the problem that was initially fixed with tulios#671, by introducing another `await lock` call in a situation where there wouldn't be any unlocking.
Small update: #705 is replaced by #714 of course, and it seems that merging these together won't be much of an issue. We're now (temporarily) using our own fork of KafkaJS with both changes applied, as the changes in the beta are too tempting to not use and they do fix at least one issue for us. Our fork is at https://github.com/Collaborne/kafkajs/tree/collaborne, and can be used for reference (but obviously shouldn't be used in production by anyone unless you know what you're doing -- it's a prerelease, after all!) |
@ankon just went through this change again and noticed that after your change requests are not processed asynchronously any more as it will wait for every single request to be processed in order, before they were processed asynchronously in Promise.all(). |
Hmm. I'll have a look, that's certainly not the idea here. Good hint to maybe write a test showing that it indeed processes things asynchronously. :) |
While looking further here I find the existing code harder and harder to reason about.
That might all be me missing something while wrapping my head around this again of course. On the good news side: There is already tests for the concurrency behavior of the iterator, and I definitely missed to adapt these -- so I'll see whether I can extend these further as well. |
eb0a771
to
6b38635
Compare
I went over the changes now and rebased them onto master. I also decided to remove the change from a generator to an async generator for now, because as @goriunov pointed out there is definitely a change in how/where the asynchronicity happens. This makes this change simpler, and more obviously a refactoring: The interface of BufferedAsyncIterator stays the same, the tests stay the same (and still pass :D), but the whole thing lost half of its lines and hopefully also got a lot more easy to process as a human. This PR now has two smaller JSDoc improvements to sprinkle some type information which helps when reading this code as well, tell me if I better create separate PRs for these. |
6b38635
to
3152ca9
Compare
+1. I don't understand the original code but do understand this. |
I wasn't quite sure about the implications of this, since the code is quite complex and it's easy to make a change that accidentally changes the asynchronous behavior, so I wrote a test that uses trace_events to create a timeline for how the promises behave. The test setup itself was simply: const BufferedAsyncIterator = require(`./${process.argv[2]}.js`)
const tracing = trace_events.createTracing({ categories: ['node.async_hooks'] })
tracing.enable();
const promises = [sleep(300).then(() => 1), sleep(100).then(() => 2), sleep(500).then(() => 3)]
const iterator = BufferedAsyncIterator(promises)
while (true) {
const result = iterator.next()
if (result.done) {
break
}
await result.value
}
tracing.disable() And then I ran it with both implementations and compared the results in Chrome's trace view. Left is the original implementation and right is the simplified one. As you can see, the behavior is identical between the different implementations. The traces are available here: |
The goal of that iterator is to take a number of promises in unknown states, and generate their results in the order of these promises resolving/rejecting. This commit changes the implementation in such a way that we do not need an event emitter *and* a result queue and a (fairly complex) logic of synchronizing these. Instead the iterator builds two parallel queues, one containing unresolved promises and one containing the resolve and reject functions for these promises. The incoming promises then are configured to pop the next {resolve, reject} structure, and call that with the respective result. The iterator at the same time picks the next unresolved promise from its own queue and yields that.
3152ca9
to
114209a
Compare
(Rebased onto current master) |
Did some more checks to see that error recovery was working the same way, and everything seems good. I also find this implementation a lot easier to understand, so I'm gonna go ahead and merge now that I'm confident that it's not changing the behavior. Thanks for the refactor! |
The goal of that iterator is to take a number of promises in unknown states, and generate
their results in the order of these promises resolving/rejecting.
This commit changes the implementation in such a way that we do not need an event emitter and
a result queue and a (fairly complex) logic of synchronizing these. Instead the iterator
builds two parallel queues, one containing unresolved promises and one containing the [resolve, reject]
functions for these promises. The incoming promises then are configured to pop the next [resolve, reject]
pair, and call that with the respective result. The iterator at the same time picks the next unresolved
promise from its own queue, and
awaits
the result.On the runner side this requires to
await
the iterator.next(), which actually simplifies more codeby removing the need for a separate queue of "enqueued tasks".
Note that this code also removes the configurable "handleError" logic without replacement: No caller
ever provided that, so the default of "throw the error further" is good enough.