-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enhancement for Kafka Admin Client's "Describe Consumer Group" #2035
Conversation
…zation of MemberData and MemberAssignment for the response
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks so much for working on this! The general idea is definitely something I'd be happy to merge.
However, the actual implementation appears to have some bugs... looks like they're easily fixable though.
test/test_admin_integration.py
Outdated
""" | ||
consumer = kafka_consumer_factory(group_id='testgrp', auto_offset_reset='earliest') | ||
consumer.poll(timeout_ms=20) | ||
output = kafka_admin_client.describe_consumer_groups(['testgrp']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test should probably have 2 consumer groups... I think that would catch the possible error above where it looks like only the value of the last consumer group is being returned.
Also, I'd probably have 2 consumers in one of the groups, and then verify that there are two member metadata for the one consumer group.
So that'd take 3 consumers altogether, split between two groups.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. I will make that change.
kafka/admin/client.py
Outdated
for (described_group_information, group_information_name, group_information_field) in zip(described_group, described_groups_field_schema.names, described_groups_field_schema.fields): | ||
if group_information_name == 'protocol_type': | ||
protocol_type = described_group_information | ||
is_consumer_protocol_type = (protocol_type == ConsumerProtocol.PROTOCOL_TYPE or not protocol_type) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain the logic of what you're trying to do here a little more? It's a bit confusing as currently written so difficult for me to figure out what you're trying to do here...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically its checking if the protocol_type returned by the broker is an empty string or if it is "consumer" then it needs to execute the extra decoding of the member metadata and member assignment fields in the member array.
The Kafka Java implementation for this:
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L2881
Hope it helps. Please let me know if you need more information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense. One suggestion--for your github links, hit the y
key on your keyboard and it will tie the link to a specific commit, rather than worrying about trunk
changing under your feet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the tip! I will use that next time. 😃
@jeffwidman Thanks alot for reviewing the PR! I will make the suggested changes. |
Nudge @Apurva007 |
Hey @jeffwidman ! Sorry for the delay. Been busy with some work related stuff. I will update the PR by next weekend. |
nudge @Apurva007 |
@jeffwidman Thanks for taking the time to review the PR. |
…ion response processing more readable, enhanced tests
nudge @jeffwidman |
kafka/admin/client.py
Outdated
ACLResourcePatternType | ||
from kafka.protocol.types import Array | ||
from kafka.structs import TopicPartition, OffsetAndMetadata, MemberInformation, GroupInformation | ||
from kafka.vendor import six |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one shouldn't be moved since it's merely a vendored import of a standard 3p lib... (and thankfully one that will disappear in not too distant future!)
I'll try to add a commit to this PR fixing this up... otherwise will merge and re-fix after
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11') | ||
def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic): | ||
"""Tests that the describe consumer group call returns valid consumer group information | ||
This test takes inspiration from the test 'test_group' in test_consumer_group.py. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Long term we should probably move this to a dedicated pytest
fixture, but that's definitely out of scope of this PR!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree! Let me see if I can add this in a subsequent PR.
Thank you! And my apologies for the delay, I've switched teams at my day job and that sucked up a lot more of my time lately. |
@jeffwidman No issues. I can completely understand. Thanks again for re-reviewing the PR! |
Small cleanup leftover from #2035
Small cleanup leftover from #2035
…2035) Adding namedtuples for DescribeConsumerGroup response; Adding Serialization of MemberData and MemberAssignment for the response Co-authored-by: Apurva Telang <atelang@paypal.com> Co-authored-by: Jeff Widman <jeff@jeffwidman.com>
Small cleanup leftover from dpkp#2035
First of all thank you for creating and maintaining this kafka client. I utilize it at work for some of the Kafka admin functionalities.
I was trying to utilize the "describe_consumer_groups" function and found that it was returning a raw string output of the response. This is as per the documentation.
The issue that I faced was that the internal member_metadata and member_assignment had still some encoded data.
Eg. output before changes:
Therefore, I am creating this pull request to help return a "namedtuple" with the member_metadata and member_assignment being decoded. The logic is very similar to the Kafka Java client implementation.
Eg. output after changes:
Please let me know your thoughts. Thanks!
This change is