Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Experimental Federation Speedup #9702

Merged
merged 27 commits into from
Apr 14, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
588631f
add experimental federation speedup
ShadowJonathan Mar 26, 2021
5ea99b1
news and minor fixes
ShadowJonathan Mar 26, 2021
79a5fd9
fix Collection errors
ShadowJonathan Mar 26, 2021
a30d979
swiftly pet isort
ShadowJonathan Mar 26, 2021
027a574
2 -> len
ShadowJonathan Mar 26, 2021
daa9712
🪄
ShadowJonathan Mar 31, 2021
831ceda
Merge remote-tracking branch 'origin/develop' into experimental-fed-s…
ShadowJonathan Mar 31, 2021
2506f63
word
ShadowJonathan Mar 31, 2021
73e2c78
news
ShadowJonathan Mar 31, 2021
44fcea4
event_to_dests naming
ShadowJonathan Mar 31, 2021
30e3338
split handle_room_events comprehensions to make it more readable
ShadowJonathan Mar 31, 2021
fbfcadb
actually fix async bug
ShadowJonathan Mar 31, 2021
fc7e650
naming again
ShadowJonathan Mar 31, 2021
16c39da
Apply suggestions from code review
ShadowJonathan Mar 31, 2021
621d637
apply suggestions from feedback
ShadowJonathan Mar 31, 2021
632be5c
more feedback suggestions
ShadowJonathan Mar 31, 2021
43f9e28
even more feedback suggestions
ShadowJonathan Mar 31, 2021
dc6730a
fix room_and_destination_to_ordering
ShadowJonathan Mar 31, 2021
ad796a3
apply feedback suggestions
ShadowJonathan Apr 2, 2021
e7e6c67
Merge remote-tracking branch 'origin/develop' into experimental-fed-s…
ShadowJonathan Apr 2, 2021
9165cb5
wording
ShadowJonathan Apr 2, 2021
bbf52d5
more wording
ShadowJonathan Apr 2, 2021
7a33b4c
Apply suggestions from code review
ShadowJonathan Apr 9, 2021
a89cd8e
Apply suggestions from code review (manually)
ShadowJonathan Apr 9, 2021
4551879
Merge branch 'develop' into experimental-fed-speedup
ShadowJonathan Apr 9, 2021
95ceb32
Apply suggestions from code review (manually) #2
ShadowJonathan Apr 9, 2021
5703cd0
Update 9702.misc
richvdh Apr 14, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9702.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up Federation by using less database calls.
111 changes: 72 additions & 39 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
events_processed_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
from synapse.types import Collection, JsonDict, ReadReceipt, RoomStreamToken
from synapse.util.metrics import Measure, measure_func

if TYPE_CHECKING:
Expand Down Expand Up @@ -275,15 +275,17 @@ async def _process_event_queue_loop(self) -> None:
if not events and next_token >= self._last_poked_id:
break

async def handle_event(event: EventBase) -> None:
async def handle_event(
ShadowJonathan marked this conversation as resolved.
Show resolved Hide resolved
event: EventBase,
) -> Optional[Tuple[EventBase, Collection[str]]]:
# Only send events for this server.
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
is_mine = self.is_mine_id(event.sender)
if not is_mine and send_on_behalf_of is None:
return
return None

if not event.internal_metadata.should_proactively_send():
return
return None

destinations = None # type: Optional[Set[str]]
if not event.prev_event_ids():
Expand Down Expand Up @@ -318,7 +320,7 @@ async def handle_event(event: EventBase) -> None:
"Failed to calculate hosts in room for event: %s",
event.event_id,
)
return
return None

destinations = {
d
Expand All @@ -328,6 +330,8 @@ async def handle_event(event: EventBase) -> None:
)
}

destinations.discard(self.server_name)

if send_on_behalf_of is not None:
# If we are sending the event on behalf of another server
# then it already has the event and there is no reason to
Expand All @@ -337,34 +341,84 @@ async def handle_event(event: EventBase) -> None:
logger.debug("Sending %s to %r", event, destinations)
ShadowJonathan marked this conversation as resolved.
Show resolved Hide resolved

if destinations:
await self._send_pdu(event, destinations)

now = self.clock.time_msec()
ts = await self.store.get_received_ts(event.event_id)

synapse.metrics.event_processing_lag_by_event.labels(
"federation_sender"
).observe((now - ts) / 1000)

async def handle_room_events(events: Iterable[EventBase]) -> None:
return event, destinations
ShadowJonathan marked this conversation as resolved.
Show resolved Hide resolved
return None

async def handle_room_events(
ShadowJonathan marked this conversation as resolved.
Show resolved Hide resolved
room_id: str, events: Iterable[EventBase]
) -> Tuple[str, Dict[EventBase, Collection[str]]]:
with Measure(self.clock, "handle_room_events"):
ShadowJonathan marked this conversation as resolved.
Show resolved Hide resolved
for event in events:
await handle_event(event)
# Handle all events, skip if handle_event returns None
handled_events = (
ret
for ret in [await handle_event(event) for event in events]
Copy link
Contributor Author

@ShadowJonathan ShadowJonathan Mar 31, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small warning: if we change this [] comprehension into a generator (()), python will start throwing errors about async iterators, because to work with an async iterator/generator in a sub-comprehension, you have to prefix every for with async for.

I had split it before i realised it didn't solve the problem, and then i set this small generator to [], which fixed it, but kept the split.

This'll be a small memory tradeoff for readability, as this'd create a list of len(events), but then the surrounding generator will filter out all None values, which will have itself take up no space until it is consumed by the dict comprehension

so this is a len(events) list of Optional[Tuple[Eventbase, Collection[str]], then comprehended down into Dict[EventBase, Collection[str]]

if ret is not None
)

# Generate event -> destination pairs for every handled event
return room_id, {
event: dests for event, dests in handled_events
ShadowJonathan marked this conversation as resolved.
Show resolved Hide resolved
}
ShadowJonathan marked this conversation as resolved.
Show resolved Hide resolved
ShadowJonathan marked this conversation as resolved.
Show resolved Hide resolved

events_by_room = {} # type: Dict[str, List[EventBase]]
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)

await make_deferred_yieldable(
per_room_events_and_dests = await make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(handle_room_events, evs)
for evs in events_by_room.values()
run_in_background(handle_room_events, rid, evs)
for rid, evs in events_by_room.items()
],
consumeErrors=True,
)
) # type: List[Tuple[str, Dict[EventBase, Collection[str]]]]

# Tuples of room_id + destination to their max-seen stream_ordering
room_with_dest_stream_ordering = {} # type: Dict[Tuple[str, str], int]

# List of events to send to each destination
event_to_dests = {} # type: Dict[str, List[EventBase]]

# For each roomID + events with destinations pair...
for room_id, event_and_dests in per_room_events_and_dests:
ShadowJonathan marked this conversation as resolved.
Show resolved Hide resolved
# ...get every event with their destinations...
for event, destinations in event_and_dests.items():

# (we got this from the database, it's filled)
assert event.internal_metadata.stream_ordering

# ...iterate over those destinations..
for destination in destinations:
if (
room_with_dest_stream_ordering.setdefault(
(room_id, destination), 0
richvdh marked this conversation as resolved.
Show resolved Hide resolved
)
< event.internal_metadata.stream_ordering
):
# ...update their stream-ordering...
room_with_dest_stream_ordering[
(room_id, destination)
] = event.internal_metadata.stream_ordering

# ...and add the event to each destination queue.
event_to_dests.setdefault(destination, []).append(event)

# Bulk-store destination_rooms stream_ids
await self.store.bulk_store_destination_rooms_entries(
room_with_dest_stream_ordering
)

# Send corresponding events to each destination queue
self._distribute_pdus(event_to_dests)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

distribution happens after database access, because then a database-access-crash-loop will cause no events to be spammed to other servers, but a crash loop will also cause federation to stall, which is much more noticeable in the logs (as only the errors will be visible)

also, i think it's "safer" to not have a race condition where destination_room values are set after pdus are actually distributed

anywho, if either of these functions throw, it wont reach update_federation_out_pos, and it'll loop around with finally: and the next poke from a database sequence


await self.store.update_federation_out_pos("events", next_token)

if events:
Expand Down Expand Up @@ -393,34 +447,13 @@ async def handle_room_events(events: Iterable[EventBase]) -> None:
finally:
self._is_processing = False

async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
# We loop through all destinations to see whether we already have
# a transaction in progress. If we do, stick it in the pending_pdus
# table and we'll get back to it later.
def _distribute_pdus(self, destination_to_pdus: Dict[str, List[EventBase]]) -> None:
for destination, pdus in destination_to_pdus.items():
logger.debug("Sending %d to: %s", len(pdus), destination)
ShadowJonathan marked this conversation as resolved.
Show resolved Hide resolved

destinations = set(destinations)
destinations.discard(self.server_name)
logger.debug("Sending to: %s", str(destinations))
sent_pdus_destination_dist_total.inc(len(pdus))

if not destinations:
return

sent_pdus_destination_dist_total.inc(len(destinations))
sent_pdus_destination_dist_count.inc()
ShadowJonathan marked this conversation as resolved.
Show resolved Hide resolved

assert pdu.internal_metadata.stream_ordering

# track the fact that we have a PDU for these destinations,
# to allow us to perform catch-up later on if the remote is unreachable
# for a while.
await self.store.store_destination_rooms_entries(
destinations,
pdu.room_id,
pdu.internal_metadata.stream_ordering,
)

for destination in destinations:
self._get_per_destination_queue(destination).send_pdu(pdu)
self._get_per_destination_queue(destination).send_pdu(*pdus)
ShadowJonathan marked this conversation as resolved.
Show resolved Hide resolved

async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""Send a RR to any other servers in the room
Expand Down
11 changes: 7 additions & 4 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def pending_edu_count(self) -> int:
+ len(self._pending_edus_keyed)
)

def send_pdu(self, pdu: EventBase) -> None:
def send_pdu(self, *pdus: EventBase) -> None:
ShadowJonathan marked this conversation as resolved.
Show resolved Hide resolved
"""Add a PDU to the queue, and start the transmission loop if necessary

Args:
Expand All @@ -163,10 +163,13 @@ def send_pdu(self, pdu: EventBase) -> None:
if not self._catching_up or self._last_successful_stream_ordering is None:
# only enqueue the PDU if we are not catching up (False) or do not
# yet know if we have anything to catch up (None)
self._pending_pdus.append(pdu)
self._pending_pdus.extend(pdus)
else:
assert pdu.internal_metadata.stream_ordering
self._catchup_last_skipped = pdu.internal_metadata.stream_ordering
self._catchup_last_skipped = max(
pdu.internal_metadata.stream_ordering
for pdu in pdus
if pdu.internal_metadata.stream_ordering is not None
)

self.attempt_new_transaction()

Expand Down
28 changes: 12 additions & 16 deletions synapse/storage/databases/main/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import logging
from collections import namedtuple
from typing import Iterable, List, Optional, Tuple
from typing import Dict, List, Optional, Tuple

from canonicaljson import encode_canonical_json

Expand Down Expand Up @@ -296,37 +296,33 @@ def _set_destination_retry_timings_emulated(
},
)

async def store_destination_rooms_entries(
self,
destinations: Iterable[str],
room_id: str,
stream_ordering: int,
) -> None:
async def bulk_store_destination_rooms_entries(
self, room_and_destination_to_ordering: Dict[Tuple[str, str], int]
):
"""
Updates or creates `destination_rooms` entries in batch for a single event.
Updates or creates `destination_rooms` entries in batch for many events.
ShadowJonathan marked this conversation as resolved.
Show resolved Hide resolved

Args:
destinations: list of destinations
room_id: the room_id of the event
stream_ordering: the stream_ordering of the event
room_and_destination_to_ordering: A mapping of (room, destination) -> stream_id
"""

await self.db_pool.simple_upsert_many(
table="destinations",
key_names=("destination",),
key_values=[(d,) for d in destinations],
key_values=[(key[1],) for key in room_and_destination_to_ordering.keys()],
ShadowJonathan marked this conversation as resolved.
Show resolved Hide resolved
value_names=[],
value_values=[],
desc="store_destination_rooms_entries_dests",
)

rows = [(destination, room_id) for destination in destinations]
await self.db_pool.simple_upsert_many(
table="destination_rooms",
key_names=("destination", "room_id"),
key_values=rows,
key_names=("room_id", "destination"),
key_values=list(room_and_destination_to_ordering.keys()),
value_names=["stream_ordering"],
value_values=[(stream_ordering,)] * len(rows),
value_values=[
(stream_id,) for stream_id in room_and_destination_to_ordering.values()
],
desc="store_destination_rooms_entries_rooms",
)

Expand Down