-
-
Notifications
You must be signed in to change notification settings - Fork 530
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a new consumer level event which fires on a rebalance #1067
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change looks good overall, just one minor thing to address in the tests.
timestamp: expect.any(Number), | ||
type: 'consumer.rebalancing', | ||
payload: { | ||
groupId: expect.any(String), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's assert that these are correct. The groupId
you have as a local variable in your test, and the memberId
you can get from the GROUP_JOIN
event.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call, I've updated the test. I also added a type for the result at the same time 👍
@@ -495,6 +500,11 @@ module.exports = class Runner extends EventEmitter { | |||
retryTime, | |||
}) | |||
|
|||
this.instrumentationEmitter.emit(REBALANCING, { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we actually have kind of a bug here. Not a functional one, but a misleading one. If we try to commit and get a NOT_COORDINATOR_FOR_GROUP
error, we re-join the group, which involves finding the new coordinator and will trigger a rebalance, but it doesn't mean that the group is currently rebalancing.
I wonder if we could get away with just trying to find the new coordinator and retrying the commit, rather than rejoining the group. This code has been around for a few years, so I don't remember the exact context, but it doesn't make sense to me that we'd consider NOT_COORDINATOR_FOR_GROUP to be indicative of an ongoing rebalance. It could just mean that a new broker has become coordinator for the group.
Should be possible to try this out by having a consumer group running and shutting down the coordinator for the group and seeing what happens.
But this is not related to this PR. Just something that I noticed that should be investigated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created an issue for this #1069
All tests seemed to work when I ran them, looks like the memberId is being overwritten before it hits the check. Will have a look into this in a bit |
Description
I have added a
rebalancing
event on the consumer so that applications using KafkaJS have an easy interface they can use in order to be notified a rebalance is going on.Reason
Currently there is no simple way to know when a rebalance has occurred. We encountered an issue where we had metrics which tracked partition specific consumer lag. However if the application lost the partition due to a rebalance there was no way to remove the metric, as the app didn't know a rebalance had occurred. This event will allow us to be informed of when a rebalance has occurred so we can reset these metrics within the app.
See issue #1066
Changes