Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fonx concurrent transactions
Browse files Browse the repository at this point in the history
If more transactions arrive from an origin while we're still processing the
first one, reject them.

Hopefully a quick fix to #9489
  • Loading branch information
richvdh committed Mar 11, 2021
1 parent 8a8c5d4 commit 99c9e45
Showing 1 changed file with 36 additions and 22 deletions.
58 changes: 36 additions & 22 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
Codes,
FederationError,
IncompatibleRoomVersionError,
LimitExceededError,
NotFoundError,
SynapseError,
UnsupportedRoomVersionError,
Expand Down Expand Up @@ -112,10 +113,11 @@ def __init__(self, hs: "HomeServer"):
# with FederationHandlerRegistry.
hs.get_directory_handler()

self._federation_ratelimiter = hs.get_federation_ratelimiter()

self._server_linearizer = Linearizer("fed_server")
self._transaction_linearizer = Linearizer("fed_txn_handler")

# origins that we are currently processing a transaction from.
# a dict from origin to txn id.
self._active_transactions = {} # type: Dict[str, str]

# We cache results for transaction with the same ID
self._transaction_resp_cache = ResponseCache(
Expand Down Expand Up @@ -169,6 +171,26 @@ async def on_incoming_transaction(

logger.debug("[%s] Got transaction", transaction_id)

# we only process one transaction from each origin at a time. We need to do
# this check here, rather than in _on_incoming_transaction_inner so that we
# don't cache the rejection in _transaction_resp_cache (so that if the txn
# arrives again later, we can process it).
current_transaction = self._active_transactions.get(origin)
if current_transaction and current_transaction != transaction_id:
logger.warning(
"Received another txn %s from %s while still processing %s",
transaction_id,
origin,
current_transaction,
)
return 429, {
"errcode": Codes.LIMIT_EXCEEDED,
"error": "Too many concurrent transactions",
}

# CRITICAL SECTION: we must now not await until we populate _active_transactions
# in _on_incoming_transaction_inner.

# We wrap in a ResponseCache so that we de-duplicate retried
# transactions.
return await self._transaction_resp_cache.wrap(
Expand All @@ -182,26 +204,18 @@ async def on_incoming_transaction(
async def _on_incoming_transaction_inner(
self, origin: str, transaction: Transaction, request_time: int
) -> Tuple[int, Dict[str, Any]]:
# Use a linearizer to ensure that transactions from a remote are
# processed in order.
with await self._transaction_linearizer.queue(origin):
# We rate limit here *after* we've queued up the incoming requests,
# so that we don't fill up the ratelimiter with blocked requests.
#
# This is important as the ratelimiter allows N concurrent requests
# at a time, and only starts ratelimiting if there are more requests
# than that being processed at a time. If we queued up requests in
# the linearizer/response cache *after* the ratelimiting then those
# queued up requests would count as part of the allowed limit of N
# concurrent requests.
with self._federation_ratelimiter.ratelimit(origin) as d:
await d

result = await self._handle_incoming_transaction(
origin, transaction, request_time
)
# CRITICAL SECTION: the first thing we must do (before awaiting) is
# add an entry to _active_transactions.
assert origin not in self._active_transactions
self._active_transactions[origin] = transaction.transaction_id

return result
try:
result = await self._handle_incoming_transaction(
origin, transaction, request_time
)
return result
finally:
del self._active_transactions[origin]

async def _handle_incoming_transaction(
self, origin: str, transaction: Transaction, request_time: int
Expand Down

0 comments on commit 99c9e45

Please sign in to comment.