From a0e0677dd44f242ba38e92524512d6940e696fbc Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 30 Jul 2018 14:13:01 -0700 Subject: [PATCH] Return future from commit_offsets_async The callers of commit_offsets_async seem to imply it should return a future (eg. future = commit_offsets_async(...)), but it does not. This causes a mismatch between KafkaConsumer.commit_async()'s documentation and its behaviour. This fixes that by returning futures that were already available, we just had to actually return them. --- kafka/coordinator/consumer.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index f90d1821d..647a6b585 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -441,10 +441,13 @@ def commit_offsets_async(self, offsets, callback=None): response will be either an Exception or a OffsetCommitResponse struct. This callback can be used to trigger custom actions when a commit request completes. + + Returns: + kafka.future.Future """ self._invoke_completed_offset_commit_callbacks() if not self.coordinator_unknown(): - self._do_commit_offsets_async(offsets, callback) + future = self._do_commit_offsets_async(offsets, callback) else: # we don't know the current coordinator, so try to find it and then # send the commit or fail (we don't want recursive retries which can @@ -464,6 +467,8 @@ def commit_offsets_async(self, offsets, callback=None): # through delayed task execution. self._client.poll(timeout_ms=0) # no wakeup if we add that feature + return future + def _do_commit_offsets_async(self, offsets, callback=None): assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API' assert all(map(lambda k: isinstance(k, TopicPartition), offsets))