diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 06343d40e44e..c7a8f41addb9 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -953,6 +953,41 @@ async def _process_pulled_event( ) return + # Check if we've already tried to process this event + failed_pull_attempt_info = await self._store.get_event_failed_pull_attempt_info( + event.room_id, event_id + ) + if failed_pull_attempt_info: + # Process previously failed backfill events in the background + # to not waste something that is bound to fail again. + run_as_background_process( + "_try_process_pulled_event", + self._try_process_pulled_event, + origin, + event, + backfilled, + ) + else: + # Otherwise, we can optimistically try to process and wait for the event to + # be fully persisted. + await self._try_process_pulled_event(origin, event, backfilled) + + async def _try_process_pulled_event( + self, origin: str, event: EventBase, backfilled: bool + ) -> None: + """ + Handles all of the async tasks necessary to process a pulled event. You should + not use this method directly, instead use `_process_pulled_event` which will + handle all of the quick sync checks that should happen before-hand. + + Params: + origin: The server we received this event from + events: The received event + backfilled: True if this is part of a historical batch of events (inhibits + notification to clients, and validation of device keys.) + """ + event_id = event.event_id + try: try: context = await self._compute_event_context_with_maybe_missing_prevs( diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index ac19de183cb6..541432fc7c06 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -108,6 +108,15 @@ class BackfillQueueNavigationItem: type: str +@attr.s(frozen=True, slots=True, auto_attribs=True) +class EventFailedPullAttemptInfo: + event_id: str + room_id: str + num_attempts: int + last_attempt_ts: int + last_cause: str + + class _NoChainCoverIndex(Exception): def __init__(self, room_id: str): super().__init__("Unexpectedly no chain cover for events in %s" % (room_id,)) @@ -1583,6 +1592,28 @@ def _record_event_failed_pull_attempt_upsert_txn( txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause)) + @trace + async def get_event_failed_pull_attempt_info( + self, + room_id: str, + event_id: str, + ) -> Optional[EventFailedPullAttemptInfo]: + res = await self.db_pool.simple_select_one( + table="event_failed_pull_attempts", + keyvalues={"room_id": room_id, "event_id": event_id}, + retcols=["num_attempts", "last_attempt_ts", "last_cause"], + allow_none=True, + desc="get_event_failed_pull_attempt_info", + ) + + return EventFailedPullAttemptInfo( + event_id=event_id, + room_id=room_id, + num_attempts=res["num_attempts"], + last_attempt_ts=res["last_attempt_ts"], + last_cause=res["last_cause"], + ) + @trace async def get_event_ids_to_not_pull_from_backoff( self,