Skip to content

Feature/multithreading #1801

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 69 additions & 43 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.version import __version__

from threading import Thread


log = logging.getLogger(__name__)

Expand Down Expand Up @@ -586,7 +588,43 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
# describe delegation_token protocol not yet implemented
# Note: send the request to the least_loaded_node()

def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
def describe_broker_consumer_group(self, group_coordinator_id, group_descriptions, group_id, version):
if group_coordinator_id is not None:
this_groups_coordinator_id = group_coordinator_id
else:
this_groups_coordinator_id = self._find_group_coordinator_id(group_id)
if version <= 1:
# Note: KAFKA-6788 A potential optimization is to group the
# request per coordinator and send one request with a list of
# all consumer groups. Java still hasn't implemented this
# because the error checking is hard to get right when some
# groups error and others don't.
request = DescribeGroupsRequest[version](groups=(group_id,))
response = self._send_request_to_node(this_groups_coordinator_id, request)
assert len(response.groups) == 1
# TODO need to implement converting the response tuple into
# a more accessible interface like a namedtuple and then stop
# hardcoding tuple indices here. Several Java examples,
# including KafkaAdminClient.java
group_description = response.groups[0]
error_code = group_description[0]
error_type = Errors.for_code(error_code)
# Java has the note: KAFKA-6789, we can retry based on the error code
if error_type is not Errors.NoError:
raise error_type(
"Request '{}' failed with response '{}'."
.format(request, response))
# TODO Java checks the group protocol type, and if consumer
# (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
# the members' partition assignments... that hasn't yet been
# implemented here so just return the raw struct results
group_descriptions.append(group_description)
else:
raise NotImplementedError(
"Support for DescribeGroups v{} has not yet been added to KafkaAdminClient."
.format(version))

def describe_consumer_groups(self, group_ids, group_coordinator_id=None, timeout=None):
"""Describe a set of consumer groups.

Any errors are immediately raised.
Expand All @@ -606,44 +644,28 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
"""
group_descriptions = []
version = self._matching_api_version(DescribeGroupsRequest)
threads = []
for group_id in group_ids:
if group_coordinator_id is not None:
this_groups_coordinator_id = group_coordinator_id
else:
this_groups_coordinator_id = self._find_group_coordinator_id(group_id)
if version <= 1:
# Note: KAFKA-6788 A potential optimization is to group the
# request per coordinator and send one request with a list of
# all consumer groups. Java still hasn't implemented this
# because the error checking is hard to get right when some
# groups error and others don't.
request = DescribeGroupsRequest[version](groups=(group_id,))
response = self._send_request_to_node(this_groups_coordinator_id, request)
assert len(response.groups) == 1
# TODO need to implement converting the response tuple into
# a more accessible interface like a namedtuple and then stop
# hardcoding tuple indices here. Several Java examples,
# including KafkaAdminClient.java
group_description = response.groups[0]
error_code = group_description[0]
error_type = Errors.for_code(error_code)
# Java has the note: KAFKA-6789, we can retry based on the error code
if error_type is not Errors.NoError:
raise error_type(
"Request '{}' failed with response '{}'."
.format(request, response))
# TODO Java checks the group protocol type, and if consumer
# (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
# the members' partition assignments... that hasn't yet been
# implemented here so just return the raw struct results
group_descriptions.append(group_description)
else:
raise NotImplementedError(
"Support for DescribeGroups v{} has not yet been added to KafkaAdminClient."
.format(version))
t = Thread(target=self.describe_broker_consumer_group,
args=(group_coordinator_id, group_descriptions, group_id, version))
threads.append(t)
t.start()

for thread in threads:
thread.join(timeout=timeout)
return group_descriptions

def list_consumer_groups(self, broker_ids=None):
def list_broker_consumer_offsets(self, broker_id, consumer_groups, request):
response = self._send_request_to_node(broker_id, request)
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
raise error_type(
"Request '{}' failed with response '{}'."
.format(request, response))

consumer_groups.update(response.groups)

def list_consumer_groups(self, broker_ids=None, timeout=None):
"""List all consumer groups known to the cluster.

This returns a list of Consumer Group tuples. The tuples are
Expand Down Expand Up @@ -675,21 +697,25 @@ def list_consumer_groups(self, broker_ids=None):
consumer_groups = set()
if broker_ids is None:
broker_ids = [broker.nodeId for broker in self._client.cluster.brokers()]

version = self._matching_api_version(ListGroupsRequest)
if version <= 2:
request = ListGroupsRequest[version]()
threads = []

for broker_id in broker_ids:
response = self._send_request_to_node(broker_id, request)
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
raise error_type(
"Request '{}' failed with response '{}'."
.format(request, response))
consumer_groups.update(response.groups)
t = Thread(target=self.list_broker_consumer_offsets,
args=(broker_id, consumer_groups, request))
threads.append(t)
t.start()

for thread in threads:
thread.join(timeout=timeout)
else:
raise NotImplementedError(
"Support for ListGroups v{} has not yet been added to KafkaAdminClient."
.format(version))

return list(consumer_groups)

def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
Expand Down