This repository has been archived by the owner on Apr 26, 2024. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Catch-up after Federation Outage (split, 1) #8230
Merged
Merged
Changes from 16 commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
e7866a5
Add query to insert destination_rooms entries.
reivilibre 9251ac2
Fix upsert type signatures.
reivilibre 8839eea
Create destination_rooms table.
reivilibre 27b5d29
Upsert destination_rooms entries when we send to destinations.
reivilibre e6671ae
Test for tracking destination_rooms
reivilibre 7893b14
Antilint
reivilibre a577751
Handle review
reivilibre a58ef8e
Arg, so YOU were the source of all my woes?
reivilibre ae92ef5
retry_last_ts is nullable, so handle it properly.
reivilibre 59417df
`destination_rooms` should be pruned on purging history
reivilibre d02e56b
Rewrite purge query
reivilibre 24da031
Query improvements
reivilibre 9c933b7
Remove needless check and fix type annotations
reivilibre f36855e
Delete destination_rooms entries when we purge from rooms
reivilibre 17b3f40
Rejig the schema delta a bit
reivilibre 91b0607
Reword query: use `room_id` to link to destination_rooms
reivilibre 5777854
Remove obsolete purge_events pruning of destination_rooms
reivilibre 4c47a1d
Add comments to schema delta
reivilibre File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Track the latest event for every destination and room for catch-up after federation outage. | ||
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
34 changes: 34 additions & 0 deletions
34
synapse/storage/databases/main/schema/delta/58/15_catchup_destination_rooms.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
/* Copyright 2020 The Matrix.org Foundation C.I.C | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
-- This schema delta alters the schema to enable 'catching up' remote homeservers | ||
-- after there has been a connectivity problem for any reason. | ||
|
||
-- This stores, for each (destination, room) pair, the stream_ordering of the | ||
-- latest event for that destination. | ||
CREATE TABLE IF NOT EXISTS destination_rooms ( | ||
-- the destination in question. | ||
destination TEXT NOT NULL REFERENCES destinations (destination), | ||
-- the ID of the room in question | ||
room_id TEXT NOT NULL REFERENCES rooms (room_id), | ||
-- the stream_ordering of the event | ||
stream_ordering INTEGER NOT NULL, | ||
PRIMARY KEY (destination, room_id) | ||
-- We don't declare a foreign key on stream_orderings here because that'd mean | ||
-- we'd need to either maintain an index (expensive) or do a table scan of | ||
-- destination_rooms whenever we delete an event (also potentially expensive). | ||
reivilibre marked this conversation as resolved.
Show resolved
Hide resolved
|
||
); | ||
|
||
CREATE INDEX IF NOT EXISTS destination_rooms_room_id | ||
ON destination_rooms (room_id); | ||
reivilibre marked this conversation as resolved.
Show resolved
Hide resolved
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
from mock import Mock | ||
|
||
from synapse.rest import admin | ||
from synapse.rest.client.v1 import login, room | ||
|
||
from tests.test_utils import event_injection, make_awaitable | ||
from tests.unittest import FederatingHomeserverTestCase, override_config | ||
|
||
|
||
class FederationCatchUpTestCases(FederatingHomeserverTestCase): | ||
servlets = [ | ||
admin.register_servlets, | ||
room.register_servlets, | ||
login.register_servlets, | ||
] | ||
|
||
def make_homeserver(self, reactor, clock): | ||
return self.setup_test_homeserver( | ||
federation_transport_client=Mock(spec=["send_transaction"]), | ||
) | ||
|
||
def prepare(self, reactor, clock, hs): | ||
# stub out get_current_hosts_in_room | ||
state_handler = hs.get_state_handler() | ||
|
||
# This mock is crucial for destination_rooms to be populated. | ||
state_handler.get_current_hosts_in_room = Mock( | ||
return_value=make_awaitable(["test", "host2"]) | ||
) | ||
|
||
def get_destination_room(self, room: str, destination: str = "host2") -> dict: | ||
""" | ||
Gets the destination_rooms entry for a (destination, room_id) pair. | ||
|
||
Args: | ||
room: room ID | ||
destination: what destination, default is "host2" | ||
|
||
Returns: | ||
Dictionary of { event_id: str, stream_ordering: int } | ||
""" | ||
event_id, stream_ordering = self.get_success( | ||
self.hs.get_datastore().db_pool.execute( | ||
"test:get_destination_rooms", | ||
None, | ||
""" | ||
SELECT event_id, stream_ordering | ||
FROM destination_rooms dr | ||
JOIN events USING (stream_ordering) | ||
WHERE dr.destination = ? AND dr.room_id = ? | ||
""", | ||
destination, | ||
room, | ||
) | ||
)[0] | ||
return {"event_id": event_id, "stream_ordering": stream_ordering} | ||
|
||
@override_config({"send_federation": True}) | ||
def test_catch_up_destination_rooms_tracking(self): | ||
""" | ||
Tests that we populate the `destination_rooms` table as needed. | ||
""" | ||
self.register_user("u1", "you the one") | ||
u1_token = self.login("u1", "you the one") | ||
room = self.helper.create_room_as("u1", tok=u1_token) | ||
|
||
self.get_success( | ||
event_injection.inject_member_event(self.hs, room, "@user:host2", "join") | ||
) | ||
|
||
event_id_1 = self.helper.send(room, "wombats!", tok=u1_token)["event_id"] | ||
|
||
row_1 = self.get_destination_room(room) | ||
|
||
event_id_2 = self.helper.send(room, "rabbits!", tok=u1_token)["event_id"] | ||
|
||
row_2 = self.get_destination_room(room) | ||
|
||
# check: events correctly registered in order | ||
self.assertEqual(row_1["event_id"], event_id_1) | ||
self.assertEqual(row_2["event_id"], event_id_2) | ||
self.assertEqual(row_1["stream_ordering"], row_2["stream_ordering"] - 1) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
once you land the other parts of this feature, it'll be good to update this so that the changelog entries get combined.