Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Catch-up after Federation Outage (split, 1) #8230

Merged
merged 18 commits into from
Sep 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8230.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Track the latest event for every destination and room for catch-up after federation outage.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once you land the other parts of this feature, it'll be good to update this so that the changelog entries get combined.

11 changes: 9 additions & 2 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ async def handle_event(event: EventBase) -> None:
logger.debug("Sending %s to %r", event, destinations)

if destinations:
self._send_pdu(event, destinations)
await self._send_pdu(event, destinations)

now = self.clock.time_msec()
ts = await self.store.get_received_ts(event.event_id)
Expand Down Expand Up @@ -267,7 +267,7 @@ async def handle_room_events(events: Iterable[EventBase]) -> None:
finally:
self._is_processing = False

def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
# We loop through all destinations to see whether we already have
# a transaction in progress. If we do, stick it in the pending_pdus
# table and we'll get back to it later.
Expand All @@ -285,6 +285,13 @@ def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
sent_pdus_destination_dist_total.inc(len(destinations))
sent_pdus_destination_dist_count.inc()

# track the fact that we have a PDU for these destinations,
# to allow us to perform catch-up later on if the remote is unreachable
# for a while.
await self.store.store_destination_rooms_entries(
destinations, pdu.room_id, pdu.internal_metadata.stream_ordering,
)

for destination in destinations:
self._get_per_destination_queue(destination).send_pdu(pdu, order)

Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,7 @@ def simple_upsert_many_txn(
key_names: Collection[str],
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Iterable[Iterable[str]],
value_values: Iterable[Iterable[Any]],
) -> None:
"""
Upsert, many times.
Expand Down Expand Up @@ -970,7 +970,7 @@ def simple_upsert_many_txn_emulated(
key_names: Iterable[str],
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Iterable[Iterable[str]],
value_values: Iterable[Iterable[Any]],
) -> None:
"""
Upsert, many times, but without native UPSERT support or batching.
Expand Down
2 changes: 2 additions & 0 deletions synapse/storage/databases/main/purge_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def _purge_history_txn(self, txn, room_id, token_str, delete_local_events):
# room_depth
# state_groups
# state_groups_state
# destination_rooms

# we will build a temporary table listing the events so that we don't
# have to keep shovelling the list back and forth across the
Expand Down Expand Up @@ -336,6 +337,7 @@ def _purge_room_txn(self, txn, room_id):
# and finally, the tables with an index on room_id (or no useful index)
for table in (
"current_state_events",
"destination_rooms",
"event_backward_extremities",
"event_forward_extremities",
"event_json",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- This schema delta alters the schema to enable 'catching up' remote homeservers
-- after there has been a connectivity problem for any reason.

-- This stores, for each (destination, room) pair, the stream_ordering of the
-- latest event for that destination.
CREATE TABLE IF NOT EXISTS destination_rooms (
-- the destination in question.
destination TEXT NOT NULL REFERENCES destinations (destination),
-- the ID of the room in question
room_id TEXT NOT NULL REFERENCES rooms (room_id),
-- the stream_ordering of the event
stream_ordering INTEGER NOT NULL,
PRIMARY KEY (destination, room_id)
-- We don't declare a foreign key on stream_ordering here because that'd mean
-- we'd need to either maintain an index (expensive) or do a table scan of
-- destination_rooms whenever we delete an event (also potentially expensive).
-- In addition to that, a foreign key on stream_ordering would be redundant
-- as this row doesn't need to refer to a specific event; if the event gets
-- deleted then it doesn't affect the validity of the stream_ordering here.
);

-- This index is needed to make it so that a deletion of a room (in the rooms
-- table) can be efficient, as otherwise a table scan would need to be performed
-- to check that no destination_rooms rows point to the room to be deleted.
-- Also: it makes it efficient to delete all the entries for a given room ID,
-- such as when purging a room.
CREATE INDEX IF NOT EXISTS destination_rooms_room_id
ON destination_rooms (room_id);
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
66 changes: 63 additions & 3 deletions synapse/storage/databases/main/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@

import logging
from collections import namedtuple
from typing import Optional, Tuple
from typing import Iterable, Optional, Tuple

from canonicaljson import encode_canonical_json

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import JsonDict
from synapse.util.caches.expiringcache import ExpiringCache

Expand Down Expand Up @@ -164,7 +165,9 @@ def _get_destination_retry_timings(self, txn, destination):
allow_none=True,
)

if result and result["retry_last_ts"] > 0:
# check we have a row and retry_last_ts is not null or zero
# (retry_last_ts can't be negative)
if result and result["retry_last_ts"]:
return result
else:
return None
Expand Down Expand Up @@ -273,3 +276,60 @@ def _cleanup_transactions_txn(txn):
await self.db_pool.runInteraction(
"_cleanup_transactions", _cleanup_transactions_txn
)

async def store_destination_rooms_entries(
self, destinations: Iterable[str], room_id: str, stream_ordering: int,
) -> None:
"""
Updates or creates `destination_rooms` entries in batch for a single event.

Args:
destinations: list of destinations
room_id: the room_id of the event
stream_ordering: the stream_ordering of the event
"""

return await self.db_pool.runInteraction(
"store_destination_rooms_entries",
self._store_destination_rooms_entries_txn,
destinations,
room_id,
stream_ordering,
)

def _store_destination_rooms_entries_txn(
self,
txn: LoggingTransaction,
destinations: Iterable[str],
room_id: str,
stream_ordering: int,
) -> None:

# ensure we have a `destinations` row for this destination, as there is
# a foreign key constraint.
if isinstance(self.database_engine, PostgresEngine):
q = """
INSERT INTO destinations (destination)
VALUES (?)
ON CONFLICT DO NOTHING;
"""
elif isinstance(self.database_engine, Sqlite3Engine):
q = """
INSERT OR IGNORE INTO destinations (destination)
VALUES (?);
"""
else:
raise RuntimeError("Unknown database engine")

txn.execute_batch(q, ((destination,) for destination in destinations))

rows = [(destination, room_id) for destination in destinations]

self.db_pool.simple_upsert_many_txn(
txn,
"destination_rooms",
["destination", "room_id"],
rows,
["stream_ordering"],
[(stream_ordering,)] * len(rows),
)
82 changes: 82 additions & 0 deletions tests/federation/test_federation_catch_up.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from mock import Mock

from synapse.rest import admin
from synapse.rest.client.v1 import login, room

from tests.test_utils import event_injection, make_awaitable
from tests.unittest import FederatingHomeserverTestCase, override_config


class FederationCatchUpTestCases(FederatingHomeserverTestCase):
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]

def make_homeserver(self, reactor, clock):
return self.setup_test_homeserver(
federation_transport_client=Mock(spec=["send_transaction"]),
)

def prepare(self, reactor, clock, hs):
# stub out get_current_hosts_in_room
state_handler = hs.get_state_handler()

# This mock is crucial for destination_rooms to be populated.
state_handler.get_current_hosts_in_room = Mock(
return_value=make_awaitable(["test", "host2"])
)

def get_destination_room(self, room: str, destination: str = "host2") -> dict:
"""
Gets the destination_rooms entry for a (destination, room_id) pair.

Args:
room: room ID
destination: what destination, default is "host2"

Returns:
Dictionary of { event_id: str, stream_ordering: int }
"""
event_id, stream_ordering = self.get_success(
self.hs.get_datastore().db_pool.execute(
"test:get_destination_rooms",
None,
"""
SELECT event_id, stream_ordering
FROM destination_rooms dr
JOIN events USING (stream_ordering)
WHERE dr.destination = ? AND dr.room_id = ?
""",
destination,
room,
)
)[0]
return {"event_id": event_id, "stream_ordering": stream_ordering}

@override_config({"send_federation": True})
def test_catch_up_destination_rooms_tracking(self):
"""
Tests that we populate the `destination_rooms` table as needed.
"""
self.register_user("u1", "you the one")
u1_token = self.login("u1", "you the one")
room = self.helper.create_room_as("u1", tok=u1_token)

self.get_success(
event_injection.inject_member_event(self.hs, room, "@user:host2", "join")
)

event_id_1 = self.helper.send(room, "wombats!", tok=u1_token)["event_id"]

row_1 = self.get_destination_room(room)

event_id_2 = self.helper.send(room, "rabbits!", tok=u1_token)["event_id"]

row_2 = self.get_destination_room(room)

# check: events correctly registered in order
self.assertEqual(row_1["event_id"], event_id_1)
self.assertEqual(row_2["event_id"], event_id_2)
self.assertEqual(row_1["stream_ordering"], row_2["stream_ordering"] - 1)