Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Requeue events for sending if transaction fails
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Mar 10, 2021
1 parent d6effcf commit d0bf228
Showing 1 changed file with 40 additions and 13 deletions.
53 changes: 40 additions & 13 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,23 @@
# limitations under the License.
import datetime
import logging
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast
from collections import deque
from typing import (
TYPE_CHECKING,
Dict,
Hashable,
Iterable,
Iterator,
List,
Optional,
Tuple,
TypeVar,
cast,
)

import attr
from prometheus_client import Counter
from typing_extensions import Deque

from synapse.api.errors import (
FederationDeniedError,
Expand Down Expand Up @@ -113,8 +126,8 @@ def __init__(
# destination (we are the only updater so this is safe)
self._last_successful_stream_ordering = None # type: Optional[int]

# a list of pending PDUs
self._pending_pdus = [] # type: List[EventBase]
# a queue of pending PDUs
self._pending_pdus = deque() # type: Deque[EventBase]

# XXX this is never actually used: see
# https://github.com/matrix-org/synapse/issues/7549
Expand Down Expand Up @@ -500,7 +513,7 @@ def _start_catching_up(self) -> None:
This throws away the PDU queue.
"""
self._catching_up = True
self._pending_pdus = []
self._pending_pdus = deque()


@attr.s(slots=True)
Expand All @@ -514,6 +527,7 @@ class _TransactionQueueManager:
_device_stream_id = attr.ib(type=Optional[int], default=None)
_device_list_id = attr.ib(type=Optional[int], default=None)
_last_stream_ordering = attr.ib(type=Optional[int], default=None)
_pdus = attr.ib(type=List[EventBase], factory=list)

async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:
# We have to keep 2 free slots for presence and rr_edus
Expand Down Expand Up @@ -542,10 +556,8 @@ async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:

pending_edus = device_update_edus + to_device_edus

pending_pdus = self.queue._pending_pdus

# We can only include at most 50 PDUs per transactions
pending_pdus, self.queue._pending_pdus = pending_pdus[:50], pending_pdus[50:]
# Get up to 50 PDUs from the queue
self._pdus = list(_popleft_upto_n_items_deque(self.queue._pending_pdus, 50))

pending_edus.extend(self.queue._get_rr_edus(force_flush=False))
pending_presence = self.queue._pending_presence
Expand Down Expand Up @@ -577,25 +589,29 @@ async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:
_, val = self.queue._pending_edus_keyed.popitem()
pending_edus.append(val)

if not pending_pdus and not pending_edus:
if not self._pdus and not pending_edus:
return [], []

# if we've decided to send a transaction anyway, and we have room, we
# may as well send any pending RRs
if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
pending_edus.extend(self.queue._get_rr_edus(force_flush=True))

if pending_pdus:
self._last_stream_ordering = pending_pdus[
if self._pdus:
self._last_stream_ordering = self._pdus[
-1
].internal_metadata.stream_ordering
assert self._last_stream_ordering

return pending_pdus, pending_edus
return self._pdus, pending_edus

async def __aexit__(self, exc_type, exc, tb):
if exc_type is not None:
# Failed to send transaction, nothing to do.
# Failed to send transaction. Requeue events we failed to send this
# time round.
if self._pdus:
self.queue._pending_pdus.extendleft(self._pdus)

return

# Succeeded to send the transaction so we record where we have sent up
Expand Down Expand Up @@ -623,3 +639,14 @@ async def __aexit__(self, exc_type, exc, tb):
await self.queue._store.set_destination_last_successful_stream_ordering(
self.queue._destination, self._last_stream_ordering
)


T = TypeVar("T")


def _popleft_upto_n_items_deque(d: Deque[T], n: int) -> Iterator[T]:
"Pops upto N items from the left of the deque."

while d and n:
yield d.popleft()
n -= 1

0 comments on commit d0bf228

Please sign in to comment.