Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement for Kafka Admin Client's "Describe Consumer Group" #2035

Merged
merged 4 commits into from
Sep 17, 2020

Conversation

Apurva007
Copy link
Contributor

@Apurva007 Apurva007 commented Apr 9, 2020

First of all thank you for creating and maintaining this kafka client. I utilize it at work for some of the Kafka admin functionalities.

I was trying to utilize the "describe_consumer_groups" function and found that it was returning a raw string output of the response. This is as per the documentation.
The issue that I faced was that the internal member_metadata and member_assignment had still some encoded data.

Eg. output before changes:

[(0, u'testgrp', u'Stable', u'consumer', u'range', [(u'consumer-testgrp-1-aff0c665-590e-4921-8b49-034501f0a940', u'consumer-testgrp-1', u'/10.0.0.241', '\x00\x01\x00\x00\x00\x01\x00\x04test\xff\xff\xff\xff\x00\x00\x00\x00', '\x00\x01\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\xff\xff\xff\xff')])]

Therefore, I am creating this pull request to help return a "namedtuple" with the member_metadata and member_assignment being decoded. The logic is very similar to the Kafka Java client implementation.

Eg. output after changes:

[GroupInformation(error_code=0, group=u'testgrp', state=u'Stable', protocol_type=u'consumer', protocol=u'range', members=[MemberInformation(member_id=u'consumer-testgrp-1-aff0c665-590e-4921-8b49-034501f0a940', client_id=u'consumer-testgrp-1', client_host=u'/10.0.0.241', member_metadata=ConsumerProtocolMemberMetadata(version=1, subscription=[u'test'], user_data=None), member_assignment=ConsumerProtocolMemberAssignment(version=1, assignment=[(topic=u'test', partitions=[0])], user_data=None))], authorized_operations=[])]

Please let me know your thoughts. Thanks!


This change is Reviewable

…zation of MemberData and MemberAssignment for the response
Copy link
Collaborator

@jeffwidman jeffwidman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks so much for working on this! The general idea is definitely something I'd be happy to merge.

However, the actual implementation appears to have some bugs... looks like they're easily fixable though.

kafka/admin/client.py Outdated Show resolved Hide resolved
kafka/admin/client.py Outdated Show resolved Hide resolved
kafka/admin/client.py Outdated Show resolved Hide resolved
kafka/admin/client.py Show resolved Hide resolved
"""
consumer = kafka_consumer_factory(group_id='testgrp', auto_offset_reset='earliest')
consumer.poll(timeout_ms=20)
output = kafka_admin_client.describe_consumer_groups(['testgrp'])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test should probably have 2 consumer groups... I think that would catch the possible error above where it looks like only the value of the last consumer group is being returned.

Also, I'd probably have 2 consumers in one of the groups, and then verify that there are two member metadata for the one consumer group.

So that'd take 3 consumers altogether, split between two groups.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I will make that change.

kafka/admin/client.py Outdated Show resolved Hide resolved
kafka/admin/client.py Outdated Show resolved Hide resolved
for (described_group_information, group_information_name, group_information_field) in zip(described_group, described_groups_field_schema.names, described_groups_field_schema.fields):
if group_information_name == 'protocol_type':
protocol_type = described_group_information
is_consumer_protocol_type = (protocol_type == ConsumerProtocol.PROTOCOL_TYPE or not protocol_type)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain the logic of what you're trying to do here a little more? It's a bit confusing as currently written so difficult for me to figure out what you're trying to do here...

Copy link
Contributor Author

@Apurva007 Apurva007 Apr 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically its checking if the protocol_type returned by the broker is an empty string or if it is "consumer" then it needs to execute the extra decoding of the member metadata and member assignment fields in the member array.
The Kafka Java implementation for this:
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L2881
Hope it helps. Please let me know if you need more information.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense. One suggestion--for your github links, hit the y key on your keyboard and it will tie the link to a specific commit, rather than worrying about trunk changing under your feet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the tip! I will use that next time. 😃

kafka/admin/client.py Outdated Show resolved Hide resolved
kafka/admin/client.py Outdated Show resolved Hide resolved
@Apurva007
Copy link
Contributor Author

@jeffwidman Thanks alot for reviewing the PR! I will make the suggested changes.

@jeffwidman
Copy link
Collaborator

Nudge @Apurva007

@Apurva007
Copy link
Contributor Author

Hey @jeffwidman ! Sorry for the delay. Been busy with some work related stuff. I will update the PR by next weekend.

@jeffwidman
Copy link
Collaborator

nudge @Apurva007

@Apurva007
Copy link
Contributor Author

@jeffwidman Thanks for taking the time to review the PR.
I have made the requested changes. Please can you re-review it. I will be happy to make any more changes if required.

…ion response processing more readable, enhanced tests
@Apurva007
Copy link
Contributor Author

nudge @jeffwidman

2 similar comments
@Apurva007
Copy link
Contributor Author

nudge @jeffwidman

@Apurva007
Copy link
Contributor Author

nudge @jeffwidman

ACLResourcePatternType
from kafka.protocol.types import Array
from kafka.structs import TopicPartition, OffsetAndMetadata, MemberInformation, GroupInformation
from kafka.vendor import six
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one shouldn't be moved since it's merely a vendored import of a standard 3p lib... (and thankfully one that will disappear in not too distant future!)
I'll try to add a commit to this PR fixing this up... otherwise will merge and re-fix after

@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic):
"""Tests that the describe consumer group call returns valid consumer group information
This test takes inspiration from the test 'test_group' in test_consumer_group.py.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Long term we should probably move this to a dedicated pytest fixture, but that's definitely out of scope of this PR!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree! Let me see if I can add this in a subsequent PR.

@jeffwidman jeffwidman merged commit 26b8400 into dpkp:master Sep 17, 2020
@jeffwidman
Copy link
Collaborator

Thank you! And my apologies for the delay, I've switched teams at my day job and that sucked up a lot more of my time lately.

@Apurva007
Copy link
Contributor Author

@jeffwidman No issues. I can completely understand. Thanks again for re-reviewing the PR!

jeffwidman added a commit that referenced this pull request Sep 17, 2020
Small cleanup leftover from #2035
@jeffwidman jeffwidman mentioned this pull request Sep 17, 2020
jeffwidman added a commit that referenced this pull request Sep 17, 2020
Small cleanup leftover from #2035
gabriel-tincu pushed a commit to aiven/kafka-python that referenced this pull request Sep 22, 2020
…2035)

Adding namedtuples for DescribeConsumerGroup response; Adding Serialization of MemberData and MemberAssignment for the response

Co-authored-by: Apurva Telang <atelang@paypal.com>
Co-authored-by: Jeff Widman <jeff@jeffwidman.com>
gabriel-tincu pushed a commit to aiven/kafka-python that referenced this pull request Sep 22, 2020
Small cleanup leftover from dpkp#2035
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants