Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement for Kafka Admin Client's "Describe Consumer Group" #2035

Merged
merged 4 commits into from
Sep 17, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 42 additions & 15 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from __future__ import absolute_import

from collections import defaultdict
from collections import defaultdict, namedtuple
import copy
import logging
import socket

from . import ConfigResourceType
from kafka.vendor import six

from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
ACLResourcePatternType
from kafka.client_async import KafkaClient, selectors
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, ConsumerProtocol
import kafka.errors as Errors
from kafka.errors import (
IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError,
Expand All @@ -19,9 +21,9 @@
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest)
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
ACLResourcePatternType
from kafka.protocol.types import Array
from kafka.structs import TopicPartition, OffsetAndMetadata, MemberInformation, GroupInformation
from kafka.vendor import six
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one shouldn't be moved since it's merely a vendored import of a standard 3p lib... (and thankfully one that will disappear in not too distant future!)
I'll try to add a commit to this PR fixing this up... otherwise will merge and re-fix after

from kafka.version import __version__


Expand Down Expand Up @@ -1000,22 +1002,47 @@ def _describe_consumer_groups_process_response(self, response):
"""Process a DescribeGroupsResponse into a group description."""
if response.API_VERSION <= 3:
assert len(response.groups) == 1
# TODO need to implement converting the response tuple into
# a more accessible interface like a namedtuple and then stop
# hardcoding tuple indices here. Several Java examples,
# including KafkaAdminClient.java
group_description = response.groups[0]
error_code = group_description[0]
for response_field, response_name in zip(response.SCHEMA.fields, response.SCHEMA.names):
if isinstance(response_field, Array):
described_groups = response.__dict__[response_name]
described_groups_field_schema = response_field.array_of
described_group = response.__dict__[response_name][0]
described_group_information_list = []
protocol_type_is_consumer = False
for (described_group_information, group_information_name, group_information_field) in zip(described_group, described_groups_field_schema.names, described_groups_field_schema.fields):
if group_information_name == 'protocol_type':
protocol_type = described_group_information
protocol_type_is_consumer = (protocol_type == ConsumerProtocol.PROTOCOL_TYPE or not protocol_type)
if isinstance(group_information_field, Array):
member_information_list = []
member_schema = group_information_field.array_of
for members in described_group_information:
member_information = []
for (member, member_field, member_name) in zip(members, member_schema.fields, member_schema.names):
if protocol_type_is_consumer:
if member_name == 'member_metadata' and member:
member_information.append(ConsumerProtocolMemberMetadata.decode(member))
elif member_name == 'member_assignment' and member:
member_information.append(ConsumerProtocolMemberAssignment.decode(member))
else:
member_information.append(member)
member_info_tuple = MemberInformation._make(member_information)
member_information_list.append(member_info_tuple)
described_group_information_list.append(member_information_list)
else:
described_group_information_list.append(described_group_information)
# Version 3 of the DescribeGroups API introduced the "authorized_operations" field. This will cause the namedtuple to fail
# Therefore, appending a placeholder of None in it.
if response.API_VERSION <=2:
described_group_information_list.append(None)
group_description = GroupInformation._make(described_group_information_list)
error_code = group_description.error_code
jeffwidman marked this conversation as resolved.
Show resolved Hide resolved
error_type = Errors.for_code(error_code)
# Java has the note: KAFKA-6789, we can retry based on the error code
if error_type is not Errors.NoError:
raise error_type(
"DescribeGroupsResponse failed with response '{}'."
.format(response))
# TODO Java checks the group protocol type, and if consumer
# (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
# the members' partition assignments... that hasn't yet been
# implemented here so just return the raw struct results
else:
raise NotImplementedError(
"Support for DescribeGroupsResponse_v{} has not yet been added to KafkaAdminClient."
Expand Down
5 changes: 5 additions & 0 deletions kafka/structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
OffsetAndTimestamp = namedtuple("OffsetAndTimestamp",
["offset", "timestamp"])

MemberInformation = namedtuple("MemberInformation",
["member_id", "client_id", "client_host", "member_metadata", "member_assignment"])

GroupInformation = namedtuple("GroupInformation",
["error_code", "group", "state", "protocol_type", "protocol", "members", "authorized_operations"])

# Define retry policy for async producer
# Limit value: int >= 0, 0 means no retries
Expand Down
102 changes: 100 additions & 2 deletions test/test_admin_integration.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import pytest

from test.testutil import env_kafka_version
from logging import info
from test.testutil import env_kafka_version, random_string
from threading import Event, Thread
from time import time, sleep

from kafka.errors import NoError
from kafka.admin import (
ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType)
from kafka.errors import (NoError, GroupCoordinatorNotAvailableError)


@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11")
Expand Down Expand Up @@ -138,3 +141,98 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client):

with pytest.raises(ValueError):
configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])

@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
def test_describe_consumer_group_does_not_exist(kafka_admin_client):
"""Tests that the describe consumer group call fails if the group coordinator is not available
"""
with pytest.raises(GroupCoordinatorNotAvailableError):
group_description = kafka_admin_client.describe_consumer_groups(['test'])

@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic):
"""Tests that the describe consumer group call returns valid consumer group information
This test takes inspiration from the test 'test_group' in test_consumer_group.py.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Long term we should probably move this to a dedicated pytest fixture, but that's definitely out of scope of this PR!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree! Let me see if I can add this in a subsequent PR.

"""
consumers = {}
stop = {}
threads = {}
random_group_id = 'test-group-' + random_string(6)
group_id_list = [random_group_id, random_group_id + '_2']
generations = {group_id_list[0]: set(), group_id_list[1]: set()}
def consumer_thread(i, group_id):
assert i not in consumers
assert i not in stop
stop[i] = Event()
consumers[i] = kafka_consumer_factory(group_id=group_id)
while not stop[i].is_set():
consumers[i].poll(20)
consumers[i].close()
consumers[i] = None
stop[i] = None

num_consumers = 3
for i in range(num_consumers):
group_id = group_id_list[i % 2]
t = Thread(target=consumer_thread, args=(i, group_id,))
t.start()
threads[i] = t

try:
timeout = time() + 35
while True:
for c in range(num_consumers):

# Verify all consumers have been created
if c not in consumers:
break

# Verify all consumers have an assignment
elif not consumers[c].assignment():
break

# If all consumers exist and have an assignment
else:

info('All consumers have assignment... checking for stable group')
# Verify all consumers are in the same generation
# then log state and break while loop

for consumer in consumers.values():
generations[consumer.config['group_id']].add(consumer._coordinator._generation.generation_id)

is_same_generation = any([len(consumer_generation) == 1 for consumer_generation in generations.values()])

# New generation assignment is not complete until
# coordinator.rejoining = False
rejoining = any([consumer._coordinator.rejoining
for consumer in list(consumers.values())])

if not rejoining and is_same_generation:
break
else:
sleep(1)
assert time() < timeout, "timeout waiting for assignments"

info('Group stabilized; verifying assignment')
output = kafka_admin_client.describe_consumer_groups(group_id_list)
assert len(output) == 2
consumer_groups = set()
for consumer_group in output:
assert(consumer_group.group in group_id_list)
if consumer_group.group == group_id_list[0]:
assert(len(consumer_group.members) == 2)
else:
assert(len(consumer_group.members) == 1)
for member in consumer_group.members:
assert(member.member_metadata.subscription[0] == topic)
assert(member.member_assignment.assignment[0][0] == topic)
consumer_groups.add(consumer_group.group)
assert(sorted(list(consumer_groups)) == group_id_list)
finally:
info('Shutting down %s consumers', num_consumers)
for c in range(num_consumers):
info('Stopping consumer %s', c)
stop[c].set()
threads[c].join()
threads[c] = None