From 6e5c301e08ff0a8b9fc693ba9cc88d11e5951cd2 Mon Sep 17 00:00:00 2001 From: ekerstens <49325583+ekerstens@users.noreply.github.com> Date: Wed, 15 Dec 2021 07:18:12 -0800 Subject: [PATCH] Remove wait_first and extra log (#240) Co-authored-by: Eric Kerstens --- faust/transport/consumer.py | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/faust/transport/consumer.py b/faust/transport/consumer.py index c67244139..9742943c9 100644 --- a/faust/transport/consumer.py +++ b/faust/transport/consumer.py @@ -733,13 +733,6 @@ async def getmany(self, timeout: float) -> AsyncIterator[Tuple[TP, Message]]: # convert timestamp to seconds from int milliseconds. yield tp, to_message(tp, record) - async def _wait_suspend(self): - """Wrapper around self.suspend_flow.wait() with no return value. - - This allows for easily - """ - await self.suspend_flow.wait() - async def _wait_next_records( self, timeout: float ) -> Tuple[Optional[RecordMap], Optional[Set[TP]]]: @@ -760,18 +753,10 @@ async def _wait_next_records( # Fetch records only if active partitions to avoid the risk of # fetching all partitions in the beginning when none of the # partitions is paused/resumed. - _getmany = self._getmany( + records = await self._getmany( active_partitions=active_partitions, timeout=timeout, ) - wait_results = await self.wait_first( - _getmany, - self.suspend_flow.wait(), - ) - for coro, result in zip(wait_results.done, wait_results.results): - if coro is _getmany: - records = result - break else: # We should still release to the event loop await self.sleep(1)