From 99c9e4561947b7a6dfc13599c3e3b2e2b0a02aa6 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 11 Mar 2021 16:47:09 +0000 Subject: [PATCH] Fonx concurrent transactions If more transactions arrive from an origin while we're still processing the first one, reject them. Hopefully a quick fix to https://github.com/matrix-org/synapse/issues/9489 --- synapse/federation/federation_server.py | 58 +++++++++++++++---------- 1 file changed, 36 insertions(+), 22 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 06c5e7a9e0f3..02ed500e99d1 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -41,6 +41,7 @@ Codes, FederationError, IncompatibleRoomVersionError, + LimitExceededError, NotFoundError, SynapseError, UnsupportedRoomVersionError, @@ -112,10 +113,11 @@ def __init__(self, hs: "HomeServer"): # with FederationHandlerRegistry. hs.get_directory_handler() - self._federation_ratelimiter = hs.get_federation_ratelimiter() - self._server_linearizer = Linearizer("fed_server") - self._transaction_linearizer = Linearizer("fed_txn_handler") + + # origins that we are currently processing a transaction from. + # a dict from origin to txn id. + self._active_transactions = {} # type: Dict[str, str] # We cache results for transaction with the same ID self._transaction_resp_cache = ResponseCache( @@ -169,6 +171,26 @@ async def on_incoming_transaction( logger.debug("[%s] Got transaction", transaction_id) + # we only process one transaction from each origin at a time. We need to do + # this check here, rather than in _on_incoming_transaction_inner so that we + # don't cache the rejection in _transaction_resp_cache (so that if the txn + # arrives again later, we can process it). + current_transaction = self._active_transactions.get(origin) + if current_transaction and current_transaction != transaction_id: + logger.warning( + "Received another txn %s from %s while still processing %s", + transaction_id, + origin, + current_transaction, + ) + return 429, { + "errcode": Codes.LIMIT_EXCEEDED, + "error": "Too many concurrent transactions", + } + + # CRITICAL SECTION: we must now not await until we populate _active_transactions + # in _on_incoming_transaction_inner. + # We wrap in a ResponseCache so that we de-duplicate retried # transactions. return await self._transaction_resp_cache.wrap( @@ -182,26 +204,18 @@ async def on_incoming_transaction( async def _on_incoming_transaction_inner( self, origin: str, transaction: Transaction, request_time: int ) -> Tuple[int, Dict[str, Any]]: - # Use a linearizer to ensure that transactions from a remote are - # processed in order. - with await self._transaction_linearizer.queue(origin): - # We rate limit here *after* we've queued up the incoming requests, - # so that we don't fill up the ratelimiter with blocked requests. - # - # This is important as the ratelimiter allows N concurrent requests - # at a time, and only starts ratelimiting if there are more requests - # than that being processed at a time. If we queued up requests in - # the linearizer/response cache *after* the ratelimiting then those - # queued up requests would count as part of the allowed limit of N - # concurrent requests. - with self._federation_ratelimiter.ratelimit(origin) as d: - await d - - result = await self._handle_incoming_transaction( - origin, transaction, request_time - ) + # CRITICAL SECTION: the first thing we must do (before awaiting) is + # add an entry to _active_transactions. + assert origin not in self._active_transactions + self._active_transactions[origin] = transaction.transaction_id - return result + try: + result = await self._handle_incoming_transaction( + origin, transaction, request_time + ) + return result + finally: + del self._active_transactions[origin] async def _handle_incoming_transaction( self, origin: str, transaction: Transaction, request_time: int