Skip to content

Commit

Permalink
Return future from commit_offsets_async
Browse files Browse the repository at this point in the history
The callers of commit_offsets_async seem to imply it should return a future
(eg. future = commit_offsets_async(...)), but it does not. This causes a mismatch
between KafkaConsumer.commit_async()'s documentation and its behaviour.

This fixes that by returning futures that were already available,
we just had to actually return them.
  • Loading branch information
ekimekim committed Jul 30, 2018
1 parent 9ac3cb1 commit a0e0677
Showing 1 changed file with 6 additions and 1 deletion.
7 changes: 6 additions & 1 deletion kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,10 +441,13 @@ def commit_offsets_async(self, offsets, callback=None):
response will be either an Exception or a OffsetCommitResponse
struct. This callback can be used to trigger custom actions when
a commit request completes.
Returns:
kafka.future.Future
"""
self._invoke_completed_offset_commit_callbacks()
if not self.coordinator_unknown():
self._do_commit_offsets_async(offsets, callback)
future = self._do_commit_offsets_async(offsets, callback)
else:
# we don't know the current coordinator, so try to find it and then
# send the commit or fail (we don't want recursive retries which can
Expand All @@ -464,6 +467,8 @@ def commit_offsets_async(self, offsets, callback=None):
# through delayed task execution.
self._client.poll(timeout_ms=0) # no wakeup if we add that feature

return future

def _do_commit_offsets_async(self, offsets, callback=None):
assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
Expand Down

0 comments on commit a0e0677

Please sign in to comment.