Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fed server: use a linearizer for ongoing transactions #2518

Merged
merged 1 commit into from
Oct 10, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def __init__(self, hs):
self.auth = hs.get_auth()

self._server_linearizer = Linearizer("fed_server")
self._transaction_linearizer = Linearizer("fed_txn_handler")

# We cache responses to state queries, as they take a while and often
# come in waves.
Expand Down Expand Up @@ -111,12 +112,39 @@ def on_backfill_request(self, origin, room_id, versions, limit):
def on_incoming_transaction(self, transaction_data):
# keep this as early as possible to make the calculated origin ts as
# accurate as possible.
request_time = int(self._clock.time_msec())
request_time = self._clock.time_msec()

transaction = Transaction(**transaction_data)

if not transaction.transaction_id:
raise Exception("Transaction missing transaction_id")
if not transaction.origin:
raise Exception("Transaction missing origin")

logger.debug("[%s] Got transaction", transaction.transaction_id)

# use a linearizer to ensure that we don't process the same transaction
# multiple times in parallel.
with (yield self._transaction_linearizer.queue(
(transaction.origin, transaction.transaction_id),
)):
result = yield self._handle_incoming_transaction(
transaction, request_time,
)

defer.returnValue(result)

@defer.inlineCallbacks
def _handle_incoming_transaction(self, transaction, request_time):
""" Process an incoming transaction and return the HTTP response

Args:
transaction (Transaction): incoming transaction
request_time (int): timestamp that the HTTP request arrived at

Returns:
Deferred[(int, object)]: http response code and body
"""
response = yield self.transaction_actions.have_responded(transaction)

if response:
Expand Down