diff --git a/faust/transport/consumer.py b/faust/transport/consumer.py index c67244139..9742943c9 100644 --- a/faust/transport/consumer.py +++ b/faust/transport/consumer.py @@ -733,13 +733,6 @@ async def getmany(self, timeout: float) -> AsyncIterator[Tuple[TP, Message]]: # convert timestamp to seconds from int milliseconds. yield tp, to_message(tp, record) - async def _wait_suspend(self): - """Wrapper around self.suspend_flow.wait() with no return value. - - This allows for easily - """ - await self.suspend_flow.wait() - async def _wait_next_records( self, timeout: float ) -> Tuple[Optional[RecordMap], Optional[Set[TP]]]: @@ -760,18 +753,10 @@ async def _wait_next_records( # Fetch records only if active partitions to avoid the risk of # fetching all partitions in the beginning when none of the # partitions is paused/resumed. - _getmany = self._getmany( + records = await self._getmany( active_partitions=active_partitions, timeout=timeout, ) - wait_results = await self.wait_first( - _getmany, - self.suspend_flow.wait(), - ) - for coro, result in zip(wait_results.done, wait_results.results): - if coro is _getmany: - records = result - break else: # We should still release to the event loop await self.sleep(1)