Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Spread out sending device lists to remote hosts #12132

Merged
merged 4 commits into from
Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12132.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve performance of logging in for large accounts.
2 changes: 1 addition & 1 deletion synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ def send_presence_to_destinations(

self.notifier.on_new_replication_data()

def send_device_messages(self, destination: str) -> None:
def send_device_messages(self, destination: str, immediate: bool = False) -> None:
"""As per FederationSender"""
# We don't need to replicate this as it gets sent down a different
# stream.
Expand Down
26 changes: 17 additions & 9 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,12 @@ def build_and_send_edu(
raise NotImplementedError()

@abc.abstractmethod
def send_device_messages(self, destination: str) -> None:
def send_device_messages(self, destination: str, immediate: bool = True) -> None:
"""Tells the sender that a new device message is ready to be sent to the
destination. The `immediate` flag specifies whether the messages should
be tried to be sent immediately, or whether it can be delayed for a
short while (to aid performance).
"""
raise NotImplementedError()

@abc.abstractmethod
Expand Down Expand Up @@ -146,9 +151,8 @@ async def get_replication_rows(


@attr.s
class _PresenceQueue:
"""A queue of destinations that need to be woken up due to new presence
updates.
class _DestinationWakeupQueue:
"""A queue of destinations that need to be woken up due to new updates.

Staggers waking up of per destination queues to ensure that we don't attempt
to start TLS connections with many hosts all at once, leading to pinned CPU.
Expand All @@ -175,7 +179,7 @@ def add_to_queue(self, destination: str) -> None:
if not self.processing:
self._handle()

@wrap_as_background_process("_PresenceQueue.handle")
@wrap_as_background_process("_DestinationWakeupQueue.handle")
async def _handle(self) -> None:
"""Background process to drain the queue."""

Expand Down Expand Up @@ -297,7 +301,7 @@ def __init__(self, hs: "HomeServer"):

self._external_cache = hs.get_external_cache()

self._presence_queue = _PresenceQueue(self, self.clock)
self._destination_wakeup_queue = _DestinationWakeupQueue(self, self.clock)

def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
"""Get or create a PerDestinationQueue for the given destination
Expand Down Expand Up @@ -614,7 +618,7 @@ def send_presence_to_destinations(
states, start_loop=False
)

self._presence_queue.add_to_queue(destination)
self._destination_wakeup_queue.add_to_queue(destination)

def build_and_send_edu(
self,
Expand Down Expand Up @@ -667,7 +671,7 @@ def send_edu(self, edu: Edu, key: Optional[Hashable]) -> None:
else:
queue.send_edu(edu)

def send_device_messages(self, destination: str) -> None:
def send_device_messages(self, destination: str, immediate: bool = False) -> None:
if destination == self.server_name:
logger.warning("Not sending device update to ourselves")
return
Expand All @@ -677,7 +681,11 @@ def send_device_messages(self, destination: str) -> None:
):
return

self._get_per_destination_queue(destination).attempt_new_transaction()
if immediate:
self._get_per_destination_queue(destination).attempt_new_transaction()
else:
self._get_per_destination_queue(destination).mark_new_data()
self._destination_wakeup_queue.add_to_queue(destination)

def wake_destination(self, destination: str) -> None:
"""Called when we want to retry sending transactions to a remote.
Expand Down
10 changes: 10 additions & 0 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,16 @@ def send_edu(self, edu: Edu) -> None:
self._pending_edus.append(edu)
self.attempt_new_transaction()

def mark_new_data(self) -> None:
"""Marks that the destination has new data to send, without starting a
new transaction.

If a transaction loop is already in progress then a new transcation will
be attempted when the current one finishes.
"""

self._new_data_to_send = True

def attempt_new_transaction(self) -> None:
"""Try to start a new transaction to this destination

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ async def notify_device_update(
"Sending device list update notif for %r to: %r", user_id, hosts
)
for host in hosts:
self.federation_sender.send_device_messages(host)
self.federation_sender.send_device_messages(host, immediate=False)
log_kv({"message": "sent device update to host", "host": host})

async def notify_user_signature_update(
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ async def process_replication_rows(
# changes.
hosts = {row.entity for row in rows if not row.entity.startswith("@")}
for host in hosts:
self.federation_sender.send_device_messages(host)
self.federation_sender.send_device_messages(host, immediate=False)

elif stream_name == ToDeviceStream.NAME:
# The to_device stream includes stuff to be pushed to both local
Expand Down
52 changes: 48 additions & 4 deletions tests/federation/test_federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,12 @@ def test_send_device_updates(self):
self.assertEqual(len(self.edus), 1)
stream_id = self.check_device_update_edu(self.edus.pop(0), u1, "D1", None)

# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
self.reactor.advance(1)

# a second call should produce no new device EDUs
self.hs.get_federation_sender().send_device_messages("host2")
self.pump()
self.assertEqual(self.edus, [])

# a second device
Expand Down Expand Up @@ -232,6 +235,10 @@ def test_upload_signatures(self):
device1_signing_key = self.generate_and_upload_device_signing_key(u1, "D1")
device2_signing_key = self.generate_and_upload_device_signing_key(u1, "D2")

# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
self.reactor.advance(1)

# expect two more edus
self.assertEqual(len(self.edus), 2)
stream_id = self.check_device_update_edu(self.edus.pop(0), u1, "D1", stream_id)
Expand Down Expand Up @@ -265,6 +272,10 @@ def test_upload_signatures(self):
e2e_handler.upload_signing_keys_for_user(u1, cross_signing_keys)
)

# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
self.reactor.advance(1)

# expect signing key update edu
self.assertEqual(len(self.edus), 2)
self.assertEqual(self.edus.pop(0)["edu_type"], "m.signing_key_update")
Expand All @@ -284,6 +295,10 @@ def test_upload_signatures(self):
)
self.assertEqual(ret["failures"], {})

# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
self.reactor.advance(1)

# expect two edus, in one or two transactions. We don't know what order the
# devices will be updated.
self.assertEqual(len(self.edus), 2)
Expand All @@ -307,6 +322,10 @@ def test_delete_devices(self):
self.login("user", "pass", device_id="D2")
self.login("user", "pass", device_id="D3")

# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
self.reactor.advance(1)

# expect three edus
self.assertEqual(len(self.edus), 3)
stream_id = self.check_device_update_edu(self.edus.pop(0), u1, "D1", None)
Expand All @@ -318,6 +337,10 @@ def test_delete_devices(self):
self.hs.get_device_handler().delete_devices(u1, ["D1", "D2", "D3"])
)

# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
self.reactor.advance(1)

# expect three edus, in an unknown order
self.assertEqual(len(self.edus), 3)
for edu in self.edus:
Expand Down Expand Up @@ -350,12 +373,19 @@ def test_unreachable_server(self):
self.hs.get_device_handler().delete_devices(u1, ["D1", "D2", "D3"])
)

# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
self.reactor.advance(1)

self.assertGreaterEqual(mock_send_txn.call_count, 4)

# recover the server
mock_send_txn.side_effect = self.record_transaction
self.hs.get_federation_sender().send_device_messages("host2")
self.pump()

# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
self.reactor.advance(1)

# for each device, there should be a single update
self.assertEqual(len(self.edus), 3)
Expand Down Expand Up @@ -390,6 +420,10 @@ def test_prune_outbound_device_pokes1(self):
self.hs.get_device_handler().delete_devices(u1, ["D1", "D2", "D3"])
)

# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
self.reactor.advance(1)

self.assertGreaterEqual(mock_send_txn.call_count, 4)

# run the prune job
Expand All @@ -401,7 +435,10 @@ def test_prune_outbound_device_pokes1(self):
# recover the server
mock_send_txn.side_effect = self.record_transaction
self.hs.get_federation_sender().send_device_messages("host2")
self.pump()

# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
self.reactor.advance(1)

# there should be a single update for this user.
self.assertEqual(len(self.edus), 1)
Expand Down Expand Up @@ -435,6 +472,10 @@ def test_prune_outbound_device_pokes2(self):
self.login("user", "pass", device_id="D2")
self.login("user", "pass", device_id="D3")

# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
self.reactor.advance(1)

# delete them again
self.get_success(
self.hs.get_device_handler().delete_devices(u1, ["D1", "D2", "D3"])
Expand All @@ -451,7 +492,10 @@ def test_prune_outbound_device_pokes2(self):
# recover the server
mock_send_txn.side_effect = self.record_transaction
self.hs.get_federation_sender().send_device_messages("host2")
self.pump()

# We queue up device list updates to be sent over federation, so we
# advance to clear the queue.
self.reactor.advance(1)

# ... and we should get a single update for this user.
self.assertEqual(len(self.edus), 1)
Expand Down