Skip to content

Commit

Permalink
fixed recovery hang
Browse files Browse the repository at this point in the history
  • Loading branch information
patkivikram committed Nov 11, 2020
1 parent faa1237 commit 107142c
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 47 deletions.
87 changes: 41 additions & 46 deletions faust/tables/recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ class Recovery(Service):

_signal_recovery_start: Optional[Event] = None
_signal_recovery_end: Optional[Event] = None
_signal_recovery_reset: Optional[Event] = None

completed: Event
in_recovery: bool = False
Expand Down Expand Up @@ -203,13 +202,6 @@ def signal_recovery_end(self) -> Event:
self._signal_recovery_end = Event(loop=self.loop)
return self._signal_recovery_end

@property
def signal_recovery_reset(self) -> Event:
"""Event used to signal that recovery is restarting."""
if self._signal_recovery_reset is None:
self._signal_recovery_reset = Event(loop=self.loop)
return self._signal_recovery_reset

async def on_stop(self) -> None:
"""Call when recovery service stops."""
# Flush buffers when stopping.
Expand Down Expand Up @@ -301,7 +293,9 @@ async def _resume_streams(self) -> None:
assignment = consumer.assignment()
if assignment:
self.log.info("Seek stream partitions to committed offsets.")
await self._wait(consumer.perform_seek())
await self._wait(
consumer.perform_seek(), timeout=self.app.conf.broker_request_timeout
)
self.log.dev("Resume stream partitions")
consumer.resume_partitions(assignment)
else:
Expand Down Expand Up @@ -385,15 +379,18 @@ async def _restart_recovery(self) -> None:
)

for tp in assigned_active_tps:
if active_offsets[tp] and active_highwaters[tp]:
if active_offsets[tp] > active_highwaters[tp]:
raise ConsistencyError(
E_PERSISTED_OFFSET.format(
tp,
active_offsets[tp],
active_highwaters[tp],
),
)
if (
active_offsets[tp]
and active_highwaters[tp]
and active_offsets[tp] > active_highwaters[tp]
):
raise ConsistencyError(
E_PERSISTED_OFFSET.format(
tp,
active_offsets[tp],
active_highwaters[tp],
),
)

self.log.dev("Build offsets for standby partitions")
await self._wait(
Expand Down Expand Up @@ -435,7 +432,7 @@ async def _restart_recovery(self) -> None:
)
self.app._span_add_default_tags(span)
try:
await self._wait(self.signal_recovery_end)
await self._wait(self.signal_recovery_end.wait())
except Exception as exc:
finish_span(self._actives_span, error=exc)
else:
Expand Down Expand Up @@ -478,15 +475,18 @@ async def _restart_recovery(self) -> None:
)

for tp in standby_tps:
if standby_offsets[tp] and standby_highwaters[tp]:
if standby_offsets[tp] > standby_highwaters[tp]:
raise ConsistencyError(
E_PERSISTED_OFFSET.format(
tp,
standby_offsets[tp],
standby_highwaters[tp],
),
)
if (
standby_offsets[tp]
and standby_highwaters[tp]
and standby_offsets[tp] > standby_highwaters[tp]
):
raise ConsistencyError(
E_PERSISTED_OFFSET.format(
tp,
standby_offsets[tp],
standby_highwaters[tp],
),
)

if tracer is not None and span:
self._standbys_span = tracer.start_span(
Expand All @@ -497,6 +497,8 @@ async def _restart_recovery(self) -> None:
self.app._span_add_default_tags(span)
self.log.dev("Resume standby partitions")
T(consumer.resume_partitions)(standby_tps)
T(consumer.resume_flow)()
T(self.app.flow_control.resume)()

# Pause all our topic partitions,
# to make sure we don't fetch any more records from them.
Expand Down Expand Up @@ -549,7 +551,7 @@ def _estimated_active_remaining_secs(self, remaining: float) -> Optional[float]:
else:
return None

async def _wait(self, coro: WaitArgT, timeout=None) -> None:
async def _wait(self, coro: WaitArgT, timeout: int = None) -> None:
signal = self.signal_recovery_start.wait()
wait_result = await self.wait_first(coro, signal, timeout=timeout)
if wait_result.stopped:
Expand Down Expand Up @@ -579,12 +581,16 @@ async def on_recovery_completed(self) -> None:
assignment = consumer.assignment()
if assignment:
self.log.info("Seek stream partitions to committed offsets.")
await consumer.perform_seek()
await self._wait(
consumer.perform_seek(), timeout=self.app.conf.broker_request_timeout
)
self.completed.set()
self.log.dev("Resume stream partitions")
consumer.resume_partitions(
{tp for tp in assignment if not self._is_changelog_tp(tp)}
)
consumer.resume_flow()
self.app.flow_control.resume()
# finally make sure the fetcher is running.
await cast(_App, self.app)._fetcher.maybe_start()
self.tables.on_actives_ready()
Expand Down Expand Up @@ -707,13 +713,13 @@ async def _slurp_changelogs(self) -> None:
processing_times = self._processing_times

def _maybe_signal_recovery_end() -> None:
if self.in_recovery and not self.active_remaining_total():
if not self.active_remaining_total():
# apply anything stuck in the buffers
self.flush_buffers()
self._set_recovery_ended()
if self._actives_span is not None:
self._actives_span.set_tag("Actives-Ready", True)
logger.info("Setting recovery end")
logger.debug("Setting recovery end")
self.signal_recovery_end.set()

while not self.should_stop:
Expand All @@ -731,25 +737,18 @@ def _maybe_signal_recovery_end() -> None:
message = event.message
tp = message.tp
offset = message.offset
logger.info(f"Processing changelog event on {tp} offset {offset}")

offsets: Counter[TP]
bufsize = buffer_sizes.get(tp)
is_active = False
if tp in active_tps:
logger.info(
f"Processing changelog active event on {tp} offset {offset}"
)
is_active = True
table = tp_to_table[tp]
offsets = active_offsets
if bufsize is None:
bufsize = buffer_sizes[tp] = table.recovery_buffer_size
active_events_received_at[tp] = now
elif tp in standby_tps:
logger.info(
f"Processing changelog standby event on {tp} offset {offset}"
)
table = tp_to_table[tp]
offsets = standby_offsets
if bufsize is None:
Expand Down Expand Up @@ -782,7 +781,7 @@ def _maybe_signal_recovery_end() -> None:
_maybe_signal_recovery_end()

if not self.standby_remaining_total():
logger.info('Completed standby partition fetch')
logger.debug("Completed standby partition fetch")
if self._standbys_span:
finish_span(self._standbys_span)
self._standbys_span = None
Expand Down Expand Up @@ -825,15 +824,11 @@ def standby_remaining(self) -> Counter[TP]:

def active_remaining_total(self) -> int:
"""Return number of changes remaining for actives to be up-to-date."""
var = sum(self.active_remaining().values())
logger.debug(f"Recovery still need {var} active offsets")
return var
return sum(self.active_remaining().values())

def standby_remaining_total(self) -> int:
"""Return number of changes remaining for standbys to be up-to-date."""
var = sum(self.standby_remaining().values())
logger.debug(f"Recovery still need {var} standby offsets")
return var
return sum(self.standby_remaining().values())

def active_stats(self) -> RecoveryStatsMapping:
"""Return current active recovery statistics."""
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/transport/drivers/test_aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,7 @@ def test_start_coordinator_span(self, *, cthread):
def test_close(self, *, cthread, _consumer):
cthread._consumer = _consumer
cthread.close()
_consumer._closed is True
assert _consumer._closed
_consumer._coordinator.close.assert_called_once_with()

def test_close__no_consumer(self, *, cthread):
Expand Down

0 comments on commit 107142c

Please sign in to comment.