@@ -271,7 +271,48 @@ def _refresh_controller_id(self):
271271 "Kafka Admin interface cannot determine the controller using MetadataRequest_v{}."
272272 .format (version ))
273273
274- def _find_group_coordinator_id (self , group_id ):
274+ def _find_coordinator_id_send_request (self , group_id ):
275+ """Send a FindCoordinatorRequest to a broker.
276+
277+ :param group_id: The consumer group ID. This is typically the group
278+ name as a string.
279+ :return: A message future
280+ """
281+ # TODO add support for dynamically picking version of
282+ # GroupCoordinatorRequest which was renamed to FindCoordinatorRequest.
283+ # When I experimented with this, the coordinator value returned in
284+ # GroupCoordinatorResponse_v1 didn't match the value returned by
285+ # GroupCoordinatorResponse_v0 and I couldn't figure out why.
286+ version = 0 # version = self._matching_api_version(GroupCoordinatorRequest)
287+ if version <= 0 :
288+ request = GroupCoordinatorRequest [version ](group_id )
289+ else :
290+ raise NotImplementedError (
291+ "Support for GroupCoordinatorRequest_v{} has not yet been added to KafkaAdminClient."
292+ .format (version ))
293+ return self ._send_request_to_node (self ._client .least_loaded_node (), request )
294+
295+ def _find_coordinator_id_process_response (self , response ):
296+ """Process a FindCoordinatorResponse.
297+
298+ :param response: a FindCoordinatorResponse.
299+ :return: The node_id of the broker that is the coordinator.
300+ """
301+ if response .API_VERSION <= 0 :
302+ error_type = Errors .for_code (response .error_code )
303+ if error_type is not Errors .NoError :
304+ # Note: When error_type.retriable, Java will retry... see
305+ # KafkaAdminClient's handleFindCoordinatorError method
306+ raise error_type (
307+ "FindCoordinatorRequest failed with response '{}'."
308+ .format (response ))
309+ else :
310+ raise NotImplementedError (
311+ "Support for FindCoordinatorRequest_v{} has not yet been added to KafkaAdminClient."
312+ .format (response .API_VERSION ))
313+ return response .coordinator_id
314+
315+ def _find_coordinator_id (self , group_id ):
275316 """Find the broker node_id of the coordinator of the given group.
276317
277318 Sends a FindCoordinatorRequest message to the cluster. Will block until
@@ -283,35 +324,10 @@ def _find_group_coordinator_id(self, group_id):
283324 :return: The node_id of the broker that is the coordinator.
284325 """
285326 # Note: Java may change how this is implemented in KAFKA-6791.
286- #
287- # TODO add support for dynamically picking version of
288- # GroupCoordinatorRequest which was renamed to FindCoordinatorRequest.
289- # When I experimented with this, GroupCoordinatorResponse_v1 didn't
290- # match GroupCoordinatorResponse_v0 and I couldn't figure out why.
291- gc_request = GroupCoordinatorRequest [0 ](group_id )
292- future = self ._send_request_to_node (self ._client .least_loaded_node (), gc_request )
293-
327+ future = self ._find_coordinator_id_send_request (group_id )
294328 self ._wait_for_futures ([future ])
295-
296- gc_response = future .value
297- # use the extra error checking in add_group_coordinator() rather than
298- # immediately returning the group coordinator.
299- success = self ._client .cluster .add_group_coordinator (group_id , gc_response )
300- if not success :
301- error_type = Errors .for_code (gc_response .error_code )
302- assert error_type is not Errors .NoError
303- # Note: When error_type.retriable, Java will retry... see
304- # KafkaAdminClient's handleFindCoordinatorError method
305- raise error_type (
306- "Could not identify group coordinator for group_id '{}' from response '{}'."
307- .format (group_id , gc_response ))
308- group_coordinator = self ._client .cluster .coordinator_for_group (group_id )
309- # will be None if the coordinator was never populated, which should never happen here
310- assert group_coordinator is not None
311- # will be -1 if add_group_coordinator() failed... but by this point the
312- # error should have been raised.
313- assert group_coordinator != - 1
314- return group_coordinator
329+ response = future .value
330+ return self ._find_coordinator_id_process_response (response )
315331
316332 def _send_request_to_node (self , node_id , request ):
317333 """Send a Kafka protocol message to a specific broker.
@@ -329,7 +345,6 @@ def _send_request_to_node(self, node_id, request):
329345 self ._client .poll ()
330346 return self ._client .send (node_id , request )
331347
332-
333348 def _send_request_to_controller (self , request ):
334349 """Send a Kafka protocol message to the cluster controller.
335350
@@ -678,7 +693,7 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
678693 if group_coordinator_id is not None :
679694 this_groups_coordinator_id = group_coordinator_id
680695 else :
681- this_groups_coordinator_id = self ._find_group_coordinator_id (group_id )
696+ this_groups_coordinator_id = self ._find_coordinator_id (group_id )
682697 f = self ._describe_consumer_groups_send_request (group_id , this_groups_coordinator_id )
683698 futures .append (f )
684699
@@ -853,7 +868,7 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
853868 explicitly specified.
854869 """
855870 if group_coordinator_id is None :
856- group_coordinator_id = self ._find_group_coordinator_id (group_id )
871+ group_coordinator_id = self ._find_coordinator_id (group_id )
857872 future = self ._list_consumer_group_offsets_send_request (
858873 group_id , group_coordinator_id , partitions )
859874 self ._wait_for_futures ([future ])
0 commit comments