Skip to content

Commit

Permalink
try/finally for not_waiting_next_records
Browse files Browse the repository at this point in the history
  • Loading branch information
Eric Kerstens committed Jan 7, 2022
1 parent c04cbee commit 032436b
Showing 1 changed file with 33 additions and 31 deletions.
64 changes: 33 additions & 31 deletions faust/transport/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,39 +749,41 @@ async def _wait_next_records(
if not self.flow_active:
await self.wait(self.can_resume_flow)
# Implementation for the Fetcher service.
self.not_waiting_next_records.clear()

is_client_only = self.app.client_only
try:
self.not_waiting_next_records.clear()

active_partitions: Optional[Set[TP]]
if is_client_only:
active_partitions = None
else:
active_partitions = self._get_active_partitions()
is_client_only = self.app.client_only

records: RecordMap = {}
if is_client_only or active_partitions:
# Fetch records only if active partitions to avoid the risk of
# fetching all partitions in the beginning when none of the
# partitions is paused/resumed.
suspend_flow = self.suspend_flow.wait()
getmany = self._getmany(
active_partitions=active_partitions,
timeout=timeout,
)
wait_results = await self.wait_first(getmany, suspend_flow)
for coro, result in zip(wait_results.done, wait_results.results):
# Ignore records fetched while flow was suspended
if coro is suspend_flow:
records = {}
break
if coro is getmany:
records = result
else:
# We should still release to the event loop
await self.sleep(1)
self.not_waiting_next_records.set()
return records, active_partitions
active_partitions: Optional[Set[TP]]
if is_client_only:
active_partitions = None
else:
active_partitions = self._get_active_partitions()

records: RecordMap = {}
if is_client_only or active_partitions:
# Fetch records only if active partitions to avoid the risk of
# fetching all partitions in the beginning when none of the
# partitions is paused/resumed.
suspend_flow = self.suspend_flow.wait()
getmany = self._getmany(
active_partitions=active_partitions,
timeout=timeout,
)
wait_results = await self.wait_first(getmany, suspend_flow)
for coro, result in zip(wait_results.done, wait_results.results):
# Ignore records fetched while flow was suspended
if coro is suspend_flow:
records = {}
break
if coro is getmany:
records = result
else:
# We should still release to the event loop
await self.sleep(1)
return records, active_partitions
finally:
self.not_waiting_next_records.set()

@abc.abstractmethod
def _to_message(self, tp: TP, record: Any) -> ConsumerMessage:
Expand Down

0 comments on commit 032436b

Please sign in to comment.