Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new consumer level event which fires on a rebalance #1067

Merged
merged 3 commits into from
Apr 1, 2021

Conversation

benbirt97
Copy link
Contributor

Description

I have added a rebalancing event on the consumer so that applications using KafkaJS have an easy interface they can use in order to be notified a rebalance is going on.

Reason

Currently there is no simple way to know when a rebalance has occurred. We encountered an issue where we had metrics which tracked partition specific consumer lag. However if the application lost the partition due to a rebalance there was no way to remove the metric, as the app didn't know a rebalance had occurred. This event will allow us to be informed of when a rebalance has occurred so we can reset these metrics within the app.

See issue #1066

Changes

  • Added an event emitter for rebalancing on the consumer
  • Added test to check that the rebalancing works
  • Updated types
  • Updated docs

Copy link
Collaborator

@Nevon Nevon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change looks good overall, just one minor thing to address in the tests.

timestamp: expect.any(Number),
type: 'consumer.rebalancing',
payload: {
groupId: expect.any(String),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's assert that these are correct. The groupId you have as a local variable in your test, and the memberId you can get from the GROUP_JOIN event.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, I've updated the test. I also added a type for the result at the same time 👍

@@ -495,6 +500,11 @@ module.exports = class Runner extends EventEmitter {
retryTime,
})

this.instrumentationEmitter.emit(REBALANCING, {
Copy link
Collaborator

@Nevon Nevon Apr 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we actually have kind of a bug here. Not a functional one, but a misleading one. If we try to commit and get a NOT_COORDINATOR_FOR_GROUP error, we re-join the group, which involves finding the new coordinator and will trigger a rebalance, but it doesn't mean that the group is currently rebalancing.

I wonder if we could get away with just trying to find the new coordinator and retrying the commit, rather than rejoining the group. This code has been around for a few years, so I don't remember the exact context, but it doesn't make sense to me that we'd consider NOT_COORDINATOR_FOR_GROUP to be indicative of an ongoing rebalance. It could just mean that a new broker has become coordinator for the group.

Should be possible to try this out by having a consumer group running and shutting down the coordinator for the group and seeing what happens.

But this is not related to this PR. Just something that I noticed that should be investigated.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created an issue for this #1069

@benbirt97
Copy link
Contributor Author

All tests seemed to work when I ran them, looks like the memberId is being overwritten before it hits the check. Will have a look into this in a bit

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants