Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partitions to auto pause on consumer side topic access error #981

Merged
merged 6 commits into from
Mar 7, 2024

Conversation

hshukla
Copy link
Collaborator

@hshukla hshukla commented Feb 22, 2024

When Kafka consumer does not have access to topic, it gets org.apache.kafka.common.errors.TopicAuthorizationException with Not authorized to access topics: [topicA, topicB]. As this is a perm issue, there are high chances that it will not be resolved right away, so the subsequent poll for the same topic may get the same error. This can cause the issue if the consumer was subscribed to more topics' partitions which consumer has access for and no issues.

This PR will add the topics' partitions into the pause list if the consumer does not have access. The auto-pause is for 5 minutes(config driven); same as producer side pause.

before each poll, task checks if there are any partitions to be paused or resumed. This logic is already there, which will take care or resuming pause created due to consumer side access issue.

  • Add leader busy metrics to know how busy leader is

@@ -2364,6 +2368,7 @@ public void run() {
CoordinatorEvent event = _eventQueue.take();
if (event != null) {
handleEvent(event);
_isLeaderBusy = false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we set and unset _isLeaderBusy in the same scope ? Much easier to maintain the code that way.

One way is:

_isLeaderBusy = _adapter.isLeader();
handleEvent();
_isLeaderBusy = false;

OR set and unset inside handleEvent() itself. Wdyt ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you that it would be better to set and unset within the same scope. I think I will go with setting/unsetting inside the run() method as that is what the leader coordinator thread scope is. Right now we do handleEvent() in the future, if we enhance leader to perform other tasks, then we do not need to worry about changing anything.

final Set<String> unauthorizedTopics = tae.unauthorizedTopics();
_logger.warn("Not authorized to access, they will be added to auto-pause set, topics={}", unauthorizedTopics);
for (String topic: unauthorizedTopics) {
List<PartitionInfo> partitionInfos = _consumer.partitionsFor(topic);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the desired behavior if partitionsFor throws an exception in this scenario ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see it throwing exception. However, KafkaConsumer::partitionFor() method description says there could be exception, interesting. In this case, I think we should just catch and ignore it, so the behavior would be same as what we have on prod version right now.

@hshukla hshukla merged commit cac66bf into master Mar 7, 2024
1 check passed
@hshukla hshukla deleted the partition-pause-on-consumer-error branch March 7, 2024 04:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants