From 107142cedf2b193590847b52d3326d952e60f34b Mon Sep 17 00:00:00 2001 From: Vikram Patki 24489 Date: Wed, 11 Nov 2020 10:17:50 -0500 Subject: [PATCH] fixed recovery hang --- faust/tables/recovery.py | 87 +++++++++---------- tests/unit/transport/drivers/test_aiokafka.py | 2 +- 2 files changed, 42 insertions(+), 47 deletions(-) diff --git a/faust/tables/recovery.py b/faust/tables/recovery.py index a27e478e9..a01f96082 100644 --- a/faust/tables/recovery.py +++ b/faust/tables/recovery.py @@ -109,7 +109,6 @@ class Recovery(Service): _signal_recovery_start: Optional[Event] = None _signal_recovery_end: Optional[Event] = None - _signal_recovery_reset: Optional[Event] = None completed: Event in_recovery: bool = False @@ -203,13 +202,6 @@ def signal_recovery_end(self) -> Event: self._signal_recovery_end = Event(loop=self.loop) return self._signal_recovery_end - @property - def signal_recovery_reset(self) -> Event: - """Event used to signal that recovery is restarting.""" - if self._signal_recovery_reset is None: - self._signal_recovery_reset = Event(loop=self.loop) - return self._signal_recovery_reset - async def on_stop(self) -> None: """Call when recovery service stops.""" # Flush buffers when stopping. @@ -301,7 +293,9 @@ async def _resume_streams(self) -> None: assignment = consumer.assignment() if assignment: self.log.info("Seek stream partitions to committed offsets.") - await self._wait(consumer.perform_seek()) + await self._wait( + consumer.perform_seek(), timeout=self.app.conf.broker_request_timeout + ) self.log.dev("Resume stream partitions") consumer.resume_partitions(assignment) else: @@ -385,15 +379,18 @@ async def _restart_recovery(self) -> None: ) for tp in assigned_active_tps: - if active_offsets[tp] and active_highwaters[tp]: - if active_offsets[tp] > active_highwaters[tp]: - raise ConsistencyError( - E_PERSISTED_OFFSET.format( - tp, - active_offsets[tp], - active_highwaters[tp], - ), - ) + if ( + active_offsets[tp] + and active_highwaters[tp] + and active_offsets[tp] > active_highwaters[tp] + ): + raise ConsistencyError( + E_PERSISTED_OFFSET.format( + tp, + active_offsets[tp], + active_highwaters[tp], + ), + ) self.log.dev("Build offsets for standby partitions") await self._wait( @@ -435,7 +432,7 @@ async def _restart_recovery(self) -> None: ) self.app._span_add_default_tags(span) try: - await self._wait(self.signal_recovery_end) + await self._wait(self.signal_recovery_end.wait()) except Exception as exc: finish_span(self._actives_span, error=exc) else: @@ -478,15 +475,18 @@ async def _restart_recovery(self) -> None: ) for tp in standby_tps: - if standby_offsets[tp] and standby_highwaters[tp]: - if standby_offsets[tp] > standby_highwaters[tp]: - raise ConsistencyError( - E_PERSISTED_OFFSET.format( - tp, - standby_offsets[tp], - standby_highwaters[tp], - ), - ) + if ( + standby_offsets[tp] + and standby_highwaters[tp] + and standby_offsets[tp] > standby_highwaters[tp] + ): + raise ConsistencyError( + E_PERSISTED_OFFSET.format( + tp, + standby_offsets[tp], + standby_highwaters[tp], + ), + ) if tracer is not None and span: self._standbys_span = tracer.start_span( @@ -497,6 +497,8 @@ async def _restart_recovery(self) -> None: self.app._span_add_default_tags(span) self.log.dev("Resume standby partitions") T(consumer.resume_partitions)(standby_tps) + T(consumer.resume_flow)() + T(self.app.flow_control.resume)() # Pause all our topic partitions, # to make sure we don't fetch any more records from them. @@ -549,7 +551,7 @@ def _estimated_active_remaining_secs(self, remaining: float) -> Optional[float]: else: return None - async def _wait(self, coro: WaitArgT, timeout=None) -> None: + async def _wait(self, coro: WaitArgT, timeout: int = None) -> None: signal = self.signal_recovery_start.wait() wait_result = await self.wait_first(coro, signal, timeout=timeout) if wait_result.stopped: @@ -579,12 +581,16 @@ async def on_recovery_completed(self) -> None: assignment = consumer.assignment() if assignment: self.log.info("Seek stream partitions to committed offsets.") - await consumer.perform_seek() + await self._wait( + consumer.perform_seek(), timeout=self.app.conf.broker_request_timeout + ) self.completed.set() self.log.dev("Resume stream partitions") consumer.resume_partitions( {tp for tp in assignment if not self._is_changelog_tp(tp)} ) + consumer.resume_flow() + self.app.flow_control.resume() # finally make sure the fetcher is running. await cast(_App, self.app)._fetcher.maybe_start() self.tables.on_actives_ready() @@ -707,13 +713,13 @@ async def _slurp_changelogs(self) -> None: processing_times = self._processing_times def _maybe_signal_recovery_end() -> None: - if self.in_recovery and not self.active_remaining_total(): + if not self.active_remaining_total(): # apply anything stuck in the buffers self.flush_buffers() self._set_recovery_ended() if self._actives_span is not None: self._actives_span.set_tag("Actives-Ready", True) - logger.info("Setting recovery end") + logger.debug("Setting recovery end") self.signal_recovery_end.set() while not self.should_stop: @@ -731,15 +737,11 @@ def _maybe_signal_recovery_end() -> None: message = event.message tp = message.tp offset = message.offset - logger.info(f"Processing changelog event on {tp} offset {offset}") offsets: Counter[TP] bufsize = buffer_sizes.get(tp) is_active = False if tp in active_tps: - logger.info( - f"Processing changelog active event on {tp} offset {offset}" - ) is_active = True table = tp_to_table[tp] offsets = active_offsets @@ -747,9 +749,6 @@ def _maybe_signal_recovery_end() -> None: bufsize = buffer_sizes[tp] = table.recovery_buffer_size active_events_received_at[tp] = now elif tp in standby_tps: - logger.info( - f"Processing changelog standby event on {tp} offset {offset}" - ) table = tp_to_table[tp] offsets = standby_offsets if bufsize is None: @@ -782,7 +781,7 @@ def _maybe_signal_recovery_end() -> None: _maybe_signal_recovery_end() if not self.standby_remaining_total(): - logger.info('Completed standby partition fetch') + logger.debug("Completed standby partition fetch") if self._standbys_span: finish_span(self._standbys_span) self._standbys_span = None @@ -825,15 +824,11 @@ def standby_remaining(self) -> Counter[TP]: def active_remaining_total(self) -> int: """Return number of changes remaining for actives to be up-to-date.""" - var = sum(self.active_remaining().values()) - logger.debug(f"Recovery still need {var} active offsets") - return var + return sum(self.active_remaining().values()) def standby_remaining_total(self) -> int: """Return number of changes remaining for standbys to be up-to-date.""" - var = sum(self.standby_remaining().values()) - logger.debug(f"Recovery still need {var} standby offsets") - return var + return sum(self.standby_remaining().values()) def active_stats(self) -> RecoveryStatsMapping: """Return current active recovery statistics.""" diff --git a/tests/unit/transport/drivers/test_aiokafka.py b/tests/unit/transport/drivers/test_aiokafka.py index 354f5c73c..71bcb615a 100644 --- a/tests/unit/transport/drivers/test_aiokafka.py +++ b/tests/unit/transport/drivers/test_aiokafka.py @@ -918,7 +918,7 @@ def test_start_coordinator_span(self, *, cthread): def test_close(self, *, cthread, _consumer): cthread._consumer = _consumer cthread.close() - _consumer._closed is True + assert _consumer._closed _consumer._coordinator.close.assert_called_once_with() def test_close__no_consumer(self, *, cthread):