diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 4fd8a1b33..badac324b 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -271,7 +271,49 @@ def _refresh_controller_id(self): "Kafka Admin interface cannot determine the controller using MetadataRequest_v{}." .format(version)) - def _find_group_coordinator_id(self, group_id): + def _find_coordinator_id_send_request(self, group_id): + """Send a FindCoordinatorRequest to a broker. + + :param group_id: The consumer group ID. This is typically the group + name as a string. + :return: A message future + """ + # TODO add support for dynamically picking version of + # GroupCoordinatorRequest which was renamed to FindCoordinatorRequest. + # When I experimented with this, the coordinator value returned in + # GroupCoordinatorResponse_v1 didn't match the value returned by + # GroupCoordinatorResponse_v0 and I couldn't figure out why. + version = 0 + # version = self._matching_api_version(GroupCoordinatorRequest) + if version <= 0: + request = GroupCoordinatorRequest[version](group_id) + else: + raise NotImplementedError( + "Support for GroupCoordinatorRequest_v{} has not yet been added to KafkaAdminClient." + .format(version)) + return self._send_request_to_node(self._client.least_loaded_node(), request) + + def _find_coordinator_id_process_response(self, response): + """Process a FindCoordinatorResponse. + + :param response: a FindCoordinatorResponse. + :return: The node_id of the broker that is the coordinator. + """ + if response.API_VERSION <= 0: + error_type = Errors.for_code(response.error_code) + if error_type is not Errors.NoError: + # Note: When error_type.retriable, Java will retry... see + # KafkaAdminClient's handleFindCoordinatorError method + raise error_type( + "FindCoordinatorRequest failed with response '{}'." + .format(response)) + else: + raise NotImplementedError( + "Support for FindCoordinatorRequest_v{} has not yet been added to KafkaAdminClient." + .format(response.API_VERSION)) + return response.coordinator_id + + def _find_coordinator_id(self, group_id): """Find the broker node_id of the coordinator of the given group. Sends a FindCoordinatorRequest message to the cluster. Will block until @@ -283,35 +325,10 @@ def _find_group_coordinator_id(self, group_id): :return: The node_id of the broker that is the coordinator. """ # Note: Java may change how this is implemented in KAFKA-6791. - # - # TODO add support for dynamically picking version of - # GroupCoordinatorRequest which was renamed to FindCoordinatorRequest. - # When I experimented with this, GroupCoordinatorResponse_v1 didn't - # match GroupCoordinatorResponse_v0 and I couldn't figure out why. - gc_request = GroupCoordinatorRequest[0](group_id) - future = self._send_request_to_node(self._client.least_loaded_node(), gc_request) - + future = self._find_coordinator_id_send_request(group_id) self._wait_for_futures([future]) - - gc_response = future.value - # use the extra error checking in add_group_coordinator() rather than - # immediately returning the group coordinator. - success = self._client.cluster.add_group_coordinator(group_id, gc_response) - if not success: - error_type = Errors.for_code(gc_response.error_code) - assert error_type is not Errors.NoError - # Note: When error_type.retriable, Java will retry... see - # KafkaAdminClient's handleFindCoordinatorError method - raise error_type( - "Could not identify group coordinator for group_id '{}' from response '{}'." - .format(group_id, gc_response)) - group_coordinator = self._client.cluster.coordinator_for_group(group_id) - # will be None if the coordinator was never populated, which should never happen here - assert group_coordinator is not None - # will be -1 if add_group_coordinator() failed... but by this point the - # error should have been raised. - assert group_coordinator != -1 - return group_coordinator + response = future.value + return self._find_coordinator_id_process_response(response) def _send_request_to_node(self, node_id, request): """Send a Kafka protocol message to a specific broker. @@ -329,7 +346,6 @@ def _send_request_to_node(self, node_id, request): self._client.poll() return self._client.send(node_id, request) - def _send_request_to_controller(self, request): """Send a Kafka protocol message to the cluster controller. @@ -678,7 +694,7 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None): if group_coordinator_id is not None: this_groups_coordinator_id = group_coordinator_id else: - this_groups_coordinator_id = self._find_group_coordinator_id(group_id) + this_groups_coordinator_id = self._find_coordinator_id(group_id) f = self._describe_consumer_groups_send_request(group_id, this_groups_coordinator_id) futures.append(f) @@ -853,7 +869,7 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None, explicitly specified. """ if group_coordinator_id is None: - group_coordinator_id = self._find_group_coordinator_id(group_id) + group_coordinator_id = self._find_coordinator_id(group_id) future = self._list_consumer_group_offsets_send_request( group_id, group_coordinator_id, partitions) self._wait_for_futures([future])