Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop fetching when flow stops #253

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions faust/app/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1681,6 +1681,9 @@ async def _on_partitions_revoked(self, revoked: Set[TP]) -> None:
T(self.flow_control.suspend)()
on_timeout.info("consumer.pause_partitions")
T(consumer.pause_partitions)(assignment)
on_timeout.info("consumer.wait_for_stopped_flow")
await T(consumer.wait_for_stopped_flow)()

# Every agent instance has an incoming buffer of messages
# (a asyncio.Queue) -- we clear those to make sure
# agents will not start processing them.
Expand Down
12 changes: 6 additions & 6 deletions faust/tables/recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,21 +309,21 @@ async def _resume_streams(self, generation_id: int = 0) -> None:
self.log.warning("Recovery rebalancing again")
return
if assignment:
self.log.dev("Resume stream partitions")
consumer.resume_partitions(
{tp for tp in assignment if not self._is_changelog_tp(tp)}
)
self.log.info("Seek stream partitions to committed offsets.")
await self._wait(
consumer.perform_seek(), timeout=self.app.conf.broker_request_timeout
)
self.log.dev("Resume stream partitions")
else:
self.log.info("Resuming streams with empty assignment")
self.completed.set()
# Resume partitions and start fetching.
self.log.info("Resuming flow...")
consumer.resume_flow()
app.flow_control.resume()
consumer.resume_flow()
# finally make sure the fetcher is running.
await cast(_App, app)._fetcher.maybe_start()
self.tables.on_actives_ready()
Expand Down Expand Up @@ -440,9 +440,9 @@ async def _restart_recovery(self) -> None:
T(consumer.resume_partitions)(active_tps)
# Resume partitions and start fetching.
self.log.info("Resuming flow...")
T(self.app.flow_control.resume)()
T(consumer.resume_flow)()
await T(cast(_App, self.app)._fetcher.maybe_start)()
T(self.app.flow_control.resume)()

# Wait for actives to be up to date.
# This signal will be set by _slurp_changelogs
Expand All @@ -467,8 +467,8 @@ async def _restart_recovery(self) -> None:
T(consumer.pause_partitions)(active_tps)
else:
self.log.info("Resuming flow...")
T(consumer.resume_flow)()
T(self.app.flow_control.resume)()
T(consumer.resume_flow)()
self._set_recovery_ended()
self.log.info("Recovery complete")
if span:
Expand Down Expand Up @@ -519,8 +519,8 @@ async def _restart_recovery(self) -> None:
self.app._span_add_default_tags(span)
self.log.dev("Resume standby partitions")
T(consumer.resume_partitions)(standby_tps)
T(consumer.resume_flow)()
T(self.app.flow_control.resume)()
T(consumer.resume_flow)()

# Pause all our topic partitions,
# to make sure we don't fetch any more records from them.
Expand Down Expand Up @@ -625,8 +625,8 @@ async def on_recovery_completed(self, generation_id: int = 0) -> None:
)
self.completed.set()
self.log.dev("Resume stream partitions")
consumer.resume_flow()
self.app.flow_control.resume()
consumer.resume_flow()
# finally make sure the fetcher is running.
await cast(_App, self.app)._fetcher.maybe_start()
self.tables.on_actives_ready()
Expand Down
63 changes: 43 additions & 20 deletions faust/transport/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ class Consumer(Service, ConsumerT):
flow_active: bool = True
can_resume_flow: Event
suspend_flow: Event
not_waiting_next_records: Event

def __init__(
self,
Expand Down Expand Up @@ -477,6 +478,8 @@ def __init__(
self.randomly_assigned_topics = set()
self.can_resume_flow = Event()
self.suspend_flow = Event()
self.not_waiting_next_records = Event()
self.not_waiting_next_records.set()
self._reset_state()
super().__init__(loop=loop or self.transport.loop, **kwargs)
self.transactions = self.transport.create_transaction_manager(
Expand All @@ -500,6 +503,7 @@ def _reset_state(self) -> None:
self._buffered_partitions = set()
self.can_resume_flow.clear()
self.suspend_flow.clear()
self.not_waiting_next_records.set()
self.flow_active = True
self._time_start = monotonic()

Expand Down Expand Up @@ -581,6 +585,11 @@ def stop_flow(self) -> None:
self.can_resume_flow.clear()
self.suspend_flow.set()

async def wait_for_stopped_flow(self) -> None:
"""Wait until the consumer is not waiting on any newly fetched records."""
if not self.not_waiting_next_records.is_set():
await self.not_waiting_next_records.wait()

def resume_flow(self) -> None:
"""Allow consumer to process messages."""
self.flow_active = True
Expand Down Expand Up @@ -704,6 +713,7 @@ async def getmany(self, timeout: float) -> AsyncIterator[Tuple[TP, Message]]:
# has 1 partition, then t2 will end up being starved most of the time.
#
# We solve this by going round-robin through each topic.

records, active_partitions = await self._wait_next_records(timeout)
generation_id = self.app.consumer_generation_id
if records is None or self.should_stop:
Expand Down Expand Up @@ -739,28 +749,41 @@ async def _wait_next_records(
if not self.flow_active:
await self.wait(self.can_resume_flow)
# Implementation for the Fetcher service.
try:
self.not_waiting_next_records.clear()

is_client_only = self.app.client_only

active_partitions: Optional[Set[TP]]
if is_client_only:
active_partitions = None
else:
active_partitions = self._get_active_partitions()
is_client_only = self.app.client_only

records: RecordMap = {}
if is_client_only or active_partitions:
# Fetch records only if active partitions to avoid the risk of
# fetching all partitions in the beginning when none of the
# partitions is paused/resumed.
records = await self._getmany(
active_partitions=active_partitions,
timeout=timeout,
)
else:
# We should still release to the event loop
await self.sleep(1)
return records, active_partitions
active_partitions: Optional[Set[TP]]
if is_client_only:
active_partitions = None
else:
active_partitions = self._get_active_partitions()

records: RecordMap = {}
if is_client_only or active_partitions:
# Fetch records only if active partitions to avoid the risk of
# fetching all partitions in the beginning when none of the
# partitions is paused/resumed.
suspend_flow = self.suspend_flow.wait()
getmany = self._getmany(
active_partitions=active_partitions,
timeout=timeout,
)
wait_results = await self.wait_first(getmany, suspend_flow)
for coro, result in zip(wait_results.done, wait_results.results):
# Ignore records fetched while flow was suspended
if coro is suspend_flow:
records = {}
break
if coro is getmany:
records = result
else:
# We should still release to the event loop
await self.sleep(1)
return records, active_partitions
finally:
self.not_waiting_next_records.set()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to process the exception?


@abc.abstractmethod
def _to_message(self, tp: TP, record: Any) -> ConsumerMessage:
Expand Down
1 change: 1 addition & 0 deletions tests/unit/app/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ async def test_on_partitions_revoked(self, *, app):
transactions=Mock(
on_partitions_revoked=AsyncMock(),
),
wait_for_stopped_flow=AsyncMock(),
)
app.tables = Mock()
app.flow_control = Mock()
Expand Down