-
Notifications
You must be signed in to change notification settings - Fork 136
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add partitions to auto pause on consumer side topic access error #981
Conversation
@@ -2364,6 +2368,7 @@ public void run() { | |||
CoordinatorEvent event = _eventQueue.take(); | |||
if (event != null) { | |||
handleEvent(event); | |||
_isLeaderBusy = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we set and unset _isLeaderBusy in the same scope ? Much easier to maintain the code that way.
One way is:
_isLeaderBusy = _adapter.isLeader();
handleEvent();
_isLeaderBusy = false;
OR set and unset inside handleEvent() itself. Wdyt ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you that it would be better to set and unset within the same scope. I think I will go with setting/unsetting inside the run() method as that is what the leader coordinator thread scope is. Right now we do handleEvent()
in the future, if we enhance leader to perform other tasks, then we do not need to worry about changing anything.
final Set<String> unauthorizedTopics = tae.unauthorizedTopics(); | ||
_logger.warn("Not authorized to access, they will be added to auto-pause set, topics={}", unauthorizedTopics); | ||
for (String topic: unauthorizedTopics) { | ||
List<PartitionInfo> partitionInfos = _consumer.partitionsFor(topic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the desired behavior if partitionsFor throws an exception in this scenario ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see it throwing exception. However, KafkaConsumer::partitionFor()
method description says there could be exception, interesting. In this case, I think we should just catch and ignore it, so the behavior would be same as what we have on prod version right now.
When Kafka consumer does not have access to topic, it gets
org.apache.kafka.common.errors.TopicAuthorizationException
withNot authorized to access topics: [topicA, topicB]
. As this is a perm issue, there are high chances that it will not be resolved right away, so the subsequent poll for the same topic may get the same error. This can cause the issue if the consumer was subscribed to more topics' partitions which consumer has access for and no issues.This PR will add the topics' partitions into the pause list if the consumer does not have access. The auto-pause is for 5 minutes(config driven); same as producer side pause.
before each poll, task checks if there are any partitions to be paused or resumed. This logic is already there, which will take care or resuming pause created due to consumer side access issue.