From 499e338b0545d8f55799a2ff0d1a6ab07b692381 Mon Sep 17 00:00:00 2001 From: ekerstens <49325583+ekerstens@users.noreply.github.com> Date: Fri, 10 Dec 2021 13:46:32 -0800 Subject: [PATCH] Record generation_id after fetch (#235) Co-authored-by: Eric Kerstens --- faust/transport/consumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/faust/transport/consumer.py b/faust/transport/consumer.py index a8d6adcfd..401e35bef 100644 --- a/faust/transport/consumer.py +++ b/faust/transport/consumer.py @@ -701,8 +701,8 @@ async def getmany(self, timeout: float) -> AsyncIterator[Tuple[TP, Message]]: # has 1 partition, then t2 will end up being starved most of the time. # # We solve this by going round-robin through each topic. - generation_id = self.app.consumer_generation_id records, active_partitions = await self._wait_next_records(timeout) + generation_id = self.app.consumer_generation_id if records is None or self.should_stop: return