Skip to content

Use futures to parallelize calls to _send_request_to_node() #1807

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 21, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 75 additions & 34 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,11 @@ def _refresh_controller_id(self):
version = self._matching_api_version(MetadataRequest)
if 1 <= version <= 6:
request = MetadataRequest[version]()
response = self._send_request_to_node(self._client.least_loaded_node(), request)
future = self._send_request_to_node(self._client.least_loaded_node(), request)

self._wait_for_futures([future])

response = future.value
controller_id = response.controller_id
# verify the controller is new enough to support our requests
controller_version = self._client.check_version(controller_id)
Expand Down Expand Up @@ -281,7 +285,11 @@ def _find_group_coordinator_id(self, group_id):
# When I experimented with this, GroupCoordinatorResponse_v1 didn't
# match GroupCoordinatorResponse_v0 and I couldn't figure out why.
gc_request = GroupCoordinatorRequest[0](group_id)
gc_response = self._send_request_to_node(self._client.least_loaded_node(), gc_request)
future = self._send_request_to_node(self._client.least_loaded_node(), gc_request)

self._wait_for_futures([future])

gc_response = future.value
# use the extra error checking in add_group_coordinator() rather than
# immediately returning the group coordinator.
success = self._client.cluster.add_group_coordinator(group_id, gc_response)
Expand All @@ -304,23 +312,19 @@ def _find_group_coordinator_id(self, group_id):
def _send_request_to_node(self, node_id, request):
"""Send a Kafka protocol message to a specific broker.

Will block until the message result is received.
Returns a future that may be polled for status and results.

:param node_id: The broker id to which to send the message.
:param request: The message to send.
:return: The Kafka protocol response for the message.
:return: A future object that may be polled for status and results.
:exception: The exception if the message could not be sent.
"""
while not self._client.ready(node_id):
# poll until the connection to broker is ready, otherwise send()
# will fail with NodeNotReadyError
self._client.poll()
future = self._client.send(node_id, request)
self._client.poll(future=future)
if future.succeeded():
return future.value
else:
raise future.exception # pylint: disable-msg=raising-bad-type
return self._client.send(node_id, request)


def _send_request_to_controller(self, request):
"""Send a Kafka protocol message to the cluster controller.
Expand All @@ -333,7 +337,11 @@ def _send_request_to_controller(self, request):
tries = 2 # in case our cached self._controller_id is outdated
while tries:
tries -= 1
response = self._send_request_to_node(self._controller_id, request)
future = self._send_request_to_node(self._controller_id, request)

self._wait_for_futures([future])

response = future.value
# In Java, the error fieldname is inconsistent:
# - CreateTopicsResponse / CreatePartitionsResponse uses topic_errors
# - DeleteTopicsResponse uses topic_error_codes
Expand Down Expand Up @@ -490,7 +498,11 @@ def describe_configs(self, config_resources, include_synonyms=False):
raise NotImplementedError(
"Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient."
.format(version))
return self._send_request_to_node(self._client.least_loaded_node(), request)
future = self._send_request_to_node(self._client.least_loaded_node(), request)

self._wait_for_futures([future])

return future.value

@staticmethod
def _convert_alter_config_resource_request(config_resource):
Expand Down Expand Up @@ -529,7 +541,11 @@ def alter_configs(self, config_resources):
# // a single request that may be sent to any broker.
#
# So this is currently broken as it always sends to the least_loaded_node()
return self._send_request_to_node(self._client.least_loaded_node(), request)
future = self._send_request_to_node(self._client.least_loaded_node(), request)

self._wait_for_futures([future])

return future.value

# alter replica logs dir protocol not yet implemented
# Note: have to lookup the broker with the replica assignment and send the request to that broker
Expand Down Expand Up @@ -605,42 +621,50 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
partition assignments.
"""
group_descriptions = []
futures = []
version = self._matching_api_version(DescribeGroupsRequest)
for group_id in group_ids:
if group_coordinator_id is not None:
this_groups_coordinator_id = group_coordinator_id
else:
this_groups_coordinator_id = self._find_group_coordinator_id(group_id)

if version <= 1:
# Note: KAFKA-6788 A potential optimization is to group the
# request per coordinator and send one request with a list of
# all consumer groups. Java still hasn't implemented this
# because the error checking is hard to get right when some
# groups error and others don't.
request = DescribeGroupsRequest[version](groups=(group_id,))
response = self._send_request_to_node(this_groups_coordinator_id, request)
assert len(response.groups) == 1
# TODO need to implement converting the response tuple into
# a more accessible interface like a namedtuple and then stop
# hardcoding tuple indices here. Several Java examples,
# including KafkaAdminClient.java
group_description = response.groups[0]
error_code = group_description[0]
error_type = Errors.for_code(error_code)
# Java has the note: KAFKA-6789, we can retry based on the error code
if error_type is not Errors.NoError:
raise error_type(
"Request '{}' failed with response '{}'."
.format(request, response))
# TODO Java checks the group protocol type, and if consumer
# (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
# the members' partition assignments... that hasn't yet been
# implemented here so just return the raw struct results
group_descriptions.append(group_description)
futures.append(self._send_request_to_node(this_groups_coordinator_id, request))
else:
raise NotImplementedError(
"Support for DescribeGroups v{} has not yet been added to KafkaAdminClient."
.format(version))

self._wait_for_futures(futures)

for future in futures:
response = future.value
assert len(response.groups) == 1
# TODO need to implement converting the response tuple into
# a more accessible interface like a namedtuple and then stop
# hardcoding tuple indices here. Several Java examples,
# including KafkaAdminClient.java
group_description = response.groups[0]
error_code = group_description[0]
error_type = Errors.for_code(error_code)
# Java has the note: KAFKA-6789, we can retry based on the error code
if error_type is not Errors.NoError:
raise error_type(
"Request '{}' failed with response '{}'."
.format(request, response))
# TODO Java checks the group protocol type, and if consumer
# (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
# the members' partition assignments... that hasn't yet been
# implemented here so just return the raw struct results
group_descriptions.append(group_description)

return group_descriptions

def list_consumer_groups(self, broker_ids=None):
Expand Down Expand Up @@ -673,13 +697,19 @@ def list_consumer_groups(self, broker_ids=None):
# consumer groups move to new brokers that haven't yet been queried,
# then the same group could be returned by multiple brokers.
consumer_groups = set()
futures = []
if broker_ids is None:
broker_ids = [broker.nodeId for broker in self._client.cluster.brokers()]
version = self._matching_api_version(ListGroupsRequest)
if version <= 2:
request = ListGroupsRequest[version]()
for broker_id in broker_ids:
response = self._send_request_to_node(broker_id, request)
futures.append(self._send_request_to_node(broker_id, request))

self._wait_for_futures(futures)

for future in futures:
response = future.value
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
raise error_type(
Expand Down Expand Up @@ -738,7 +768,10 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
topics_partitions_dict[topic].add(partition)
topics_partitions = list(six.iteritems(topics_partitions_dict))
request = OffsetFetchRequest[version](group_id, topics_partitions)
response = self._send_request_to_node(group_coordinator_id, request)
future = self._send_request_to_node(group_coordinator_id, request)
self._wait_for_futures([future])
response = future.value

if version > 1: # OffsetFetchResponse_v1 lacks a top-level error_code
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
Expand All @@ -764,3 +797,11 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,

# delete groups protocol not yet implemented
# Note: send the request to the group's coordinator.

def _wait_for_futures(self, futures):
while not all(future.succeeded() for future in futures):
for future in futures:
self._client.poll(future=future)

if future.failed():
raise future.exception # pylint: disable-msg=raising-bad-type