Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Process previously failed backfill events in the background
Browse files Browse the repository at this point in the history
Fix #13623

Follow-up to #13621 and #13622

Part of making `/messages` faster: #13356
  • Loading branch information
MadLittleMods committed May 12, 2023
1 parent def4804 commit fd26164
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 0 deletions.
35 changes: 35 additions & 0 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,41 @@ async def _process_pulled_event(
)
return

# Check if we've already tried to process this event
failed_pull_attempt_info = await self._store.get_event_failed_pull_attempt_info(
event.room_id, event_id
)
if failed_pull_attempt_info:
# Process previously failed backfill events in the background
# to not waste something that is bound to fail again.
run_as_background_process(
"_try_process_pulled_event",
self._try_process_pulled_event,
origin,
event,
backfilled,
)
else:
# Otherwise, we can optimistically try to process and wait for the event to
# be fully persisted.
await self._try_process_pulled_event(origin, event, backfilled)

async def _try_process_pulled_event(
self, origin: str, event: EventBase, backfilled: bool
) -> None:
"""
Handles all of the async tasks necessary to process a pulled event. You should
not use this method directly, instead use `_process_pulled_event` which will
handle all of the quick sync checks that should happen before-hand.
Params:
origin: The server we received this event from
events: The received event
backfilled: True if this is part of a historical batch of events (inhibits
notification to clients, and validation of device keys.)
"""
event_id = event.event_id

try:
try:
context = await self._compute_event_context_with_maybe_missing_prevs(
Expand Down
31 changes: 31 additions & 0 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ class BackfillQueueNavigationItem:
type: str


@attr.s(frozen=True, slots=True, auto_attribs=True)
class EventFailedPullAttemptInfo:
event_id: str
room_id: str
num_attempts: int
last_attempt_ts: int
last_cause: str


class _NoChainCoverIndex(Exception):
def __init__(self, room_id: str):
super().__init__("Unexpectedly no chain cover for events in %s" % (room_id,))
Expand Down Expand Up @@ -1583,6 +1592,28 @@ def _record_event_failed_pull_attempt_upsert_txn(

txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause))

@trace
async def get_event_failed_pull_attempt_info(
self,
room_id: str,
event_id: str,
) -> Optional[EventFailedPullAttemptInfo]:
res = await self.db_pool.simple_select_one(
table="event_failed_pull_attempts",
keyvalues={"room_id": room_id, "event_id": event_id},
retcols=["num_attempts", "last_attempt_ts", "last_cause"],
allow_none=True,
desc="get_event_failed_pull_attempt_info",
)

return EventFailedPullAttemptInfo(
event_id=event_id,
room_id=room_id,
num_attempts=res["num_attempts"],
last_attempt_ts=res["last_attempt_ts"],
last_cause=res["last_cause"],
)

@trace
async def get_event_ids_to_not_pull_from_backoff(
self,
Expand Down

0 comments on commit fd26164

Please sign in to comment.