Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Make historical events discoverable from backfill for servers without any scrollback history (MSC2716) (federation) #10245

Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
d2e2aa7
Make historical messages available to federated servers
MadLittleMods Jun 24, 2021
2d942ec
Debug message not available on federation
MadLittleMods Jun 25, 2021
38bcf13
Add base starting insertion point when no chunk ID is provided
MadLittleMods Jun 25, 2021
e405a23
Fix messages from multiple senders in historical chunk
MadLittleMods Jun 29, 2021
36f1565
Remove debug lines
MadLittleMods Jun 29, 2021
05d6c51
Messing with selecting insertion event extremeties
MadLittleMods Jul 7, 2021
defc536
Merge branch 'develop' into madlittlemods/2716-backfill-historical-ev…
MadLittleMods Jul 7, 2021
dfad8a8
Move db schema change to new version
MadLittleMods Jul 7, 2021
7d850db
Add more better comments
MadLittleMods Jul 7, 2021
164dee4
Make a fake requester with just what we need
MadLittleMods Jul 7, 2021
04b1f7e
Store insertion events in table
MadLittleMods Jul 8, 2021
b703962
Make base insertion event float off on its own
MadLittleMods Jul 8, 2021
8c205e5
Validate that the app service can actually control the given user
MadLittleMods Jul 9, 2021
7b8b2d1
Add some better comments on what we're trying to check for
MadLittleMods Jul 9, 2021
281588f
Merge branch 'develop' into madlittlemods/2716-backfill-historical-ev…
MadLittleMods Jul 9, 2021
4226165
Continue debugging
MadLittleMods Jul 12, 2021
baae5d8
Share validation logic
MadLittleMods Jul 12, 2021
c05e43b
Add inserted historical messages to /backfill response
MadLittleMods Jul 13, 2021
02b1bea
Remove debug sql queries
MadLittleMods Jul 13, 2021
66cf5be
Merge branch 'develop' into madlittlemods/2716-backfill-historical-ev…
MadLittleMods Jul 13, 2021
ab8011b
Some marker event implemntation trials
MadLittleMods Jul 14, 2021
f20ba02
Clean up PR
MadLittleMods Jul 14, 2021
64aeb73
Rename insertion_event_id to just event_id
MadLittleMods Jul 14, 2021
ea7c30d
Add some better sql comments
MadLittleMods Jul 14, 2021
9a6fd3f
More accurate description
MadLittleMods Jul 14, 2021
0f6179f
Add changelog
MadLittleMods Jul 14, 2021
5970e3f
Make it clear what MSC the change is part of
MadLittleMods Jul 14, 2021
bc13396
Add more detail on which insertion event came through
MadLittleMods Jul 14, 2021
669da52
Address review and improve sql queries
MadLittleMods Jul 14, 2021
9a86e05
Only use event_id as unique constraint
MadLittleMods Jul 14, 2021
8999567
Fix test case where insertion event is already in the normal DAG
MadLittleMods Jul 15, 2021
35a4569
Remove debug changes
MadLittleMods Jul 15, 2021
b2be8ce
Switch to chunk events so we can auth via power_levels
MadLittleMods Jul 20, 2021
04a29fe
Switch to chunk events for federation
MadLittleMods Jul 20, 2021
258fa57
Add unstable room version to support new historical PL
MadLittleMods Jul 20, 2021
9352635
Fix federated events being rejected for no state_groups
MadLittleMods Jul 21, 2021
5c454b7
Merge branch 'develop' into madlittlemods/2716-backfill-historical-ev…
MadLittleMods Jul 21, 2021
e881cff
Merge branch 'develop' into madlittlemods/2716-backfill-historical-ev…
MadLittleMods Jul 21, 2021
c9330ec
Merge branch 'develop' into madlittlemods/2716-backfill-historical-ev…
MadLittleMods Jul 23, 2021
a8c5311
Only connect base insertion event to prev_event_ids
MadLittleMods Jul 23, 2021
ae606c7
Make it possible to get the room_version with txn
MadLittleMods Jul 24, 2021
bc896cc
Allow but ignore historical events in unsupported room version
MadLittleMods Jul 24, 2021
f231066
Move to unique index syntax
MadLittleMods Jul 24, 2021
465b3d8
High-level document how the insertion->chunk lookup works
MadLittleMods Jul 24, 2021
44bb3f0
Remove create_event fallback for room_versions
MadLittleMods Jul 28, 2021
4d936b5
Merge branch 'develop' into madlittlemods/2716-backfill-historical-ev…
MadLittleMods Jul 28, 2021
706770c
Use updated method name
MadLittleMods Jul 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scripts-dev/complement.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,4 @@ if [[ -n "$1" ]]; then
fi

# Run the tests!
go test -v -tags synapse_blacklist,msc2946,msc3083,msc2716,msc2403 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests
go test -v -tags synapse_blacklist,msc2946,msc3083,msc2716,msc2403 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests -run TestBackfillingHistory
14 changes: 7 additions & 7 deletions synapse/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,13 @@ def format_event_for_client_v1(d):

def format_event_for_client_v2(d):
drop_keys = (
"auth_events",
"prev_events",
"hashes",
"signatures",
"depth",
"origin",
"prev_state",
# "auth_events",
# "prev_events",
# "hashes",
# "signatures",
# "depth",
# "origin",
# "prev_state",
)
for key in drop_keys:
d.pop(key, None)
Expand Down
14 changes: 14 additions & 0 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1054,10 +1054,14 @@ async def maybe_backfill(
with (await self._room_backfill.queue(room_id)):
return await self._maybe_backfill_inner(room_id, current_depth, limit)

# Todo
async def _maybe_backfill_inner(
self, room_id: str, current_depth: int, limit: int
) -> bool:
extremities = await self.store.get_oldest_events_with_depth_in_room(room_id)
logger.info(
"_maybe_backfill_inner extremities(%d)=%s", len(extremities), extremities
)

if not extremities:
logger.debug("Not backfilling as no extremeties found.")
Expand Down Expand Up @@ -2123,8 +2127,18 @@ async def on_backfill_request(
limit = min(limit, 100)

events = await self.store.get_backfill_events(room_id, pdu_list, limit)
logger.info(
"on_backfill_request get_backfill_events events(%d)=%s",
len(events),
[f'{ev.content.get("body")}: {ev.type} ({ev.event_id})' for ev in events],
)

events = await filter_events_for_server(self.storage, origin, events)
logger.info(
"on_backfill_request filter_events_for_server events(%d)=%s",
len(events),
events,
)

return events

Expand Down
64 changes: 63 additions & 1 deletion synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -936,15 +936,50 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):
# We want to make sure that we do a breadth-first, "depth" ordered
# search.

# Look for the prev_event_id connected to the given event_id
query = (
"SELECT depth, prev_event_id FROM event_edges"
# Get the depth of the prev_event_id from the events table
" INNER JOIN events"
" ON prev_event_id = events.event_id"
# Find an event which matches the given event_id
" WHERE event_edges.event_id = ?"
" AND event_edges.is_state = ?"
" LIMIT ?"
)

# Look for the "insertion" events connected to the given event_id
# TODO: Do we need to worry about selecting only from the given room_id? The other query above doesn't
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
connected_insertion_event_query = (
"SELECT e.depth, i.insertion_event_id FROM insertion_event_edges AS i"
# Get the depth of the insertion event from the events table
" INNER JOIN events AS e"
" ON e.event_id = i.insertion_event_id"
# Find an insertion event which points via prev_events to the given event_id
" WHERE i.insertion_prev_event_id = ?"
" LIMIT ?"
)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

# Find any chunk connections of a given insertion event
# TODO: Do we need to worry about selecting only from the given room_id? The other query above doesn't
chunk_connection_query = (
"SELECT e.depth, c.event_id FROM insertion_events AS i"
# Find the chunk that connects to the given insertion event
" INNER JOIN chunk_edges AS c"
" ON i.next_chunk_id = c.chunk_id"
# Get the depth of the chunk start event from the events table
" INNER JOIN events AS e"
" ON e.event_id = c.event_id"
# Find an insertion event which matches the given event_id
" WHERE i.insertion_event_id = ?"
" LIMIT ?"
)

# In a PriorityQueue, the lowest valued entries are retrieved first.
# We're using depth as the priority in the queue.
# Depth is lowest at the oldest-in-time message and highest and
# newest-in-time message. We add events to the queue with a negative depth so that
# we process the newest-in-time messages first going backwards in time.
queue = PriorityQueue()

for event_id in event_list:
Expand All @@ -970,9 +1005,36 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):

event_results.add(event_id)

txn.execute(
connected_insertion_event_query, (event_id, limit - len(event_results))
)
connected_insertion_event_id_results = list(txn)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
logger.info(
"connected_insertion_event_query %s",
connected_insertion_event_id_results,
)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
for row in connected_insertion_event_id_results:
if row[1] not in event_results:
queue.put((-row[0], row[1]))

# Find any chunk connections for the given insertion event
txn.execute(
chunk_connection_query, (row[1], limit - len(event_results))
)
chunk_start_event_id_results = list(txn)
logger.info(
"chunk_start_event_id_results %s",
chunk_start_event_id_results,
)
for row in chunk_start_event_id_results:
if row[1] not in event_results:
queue.put((-row[0], row[1]))

txn.execute(query, (event_id, False, limit - len(event_results)))
prev_event_id_results = list(txn)
logger.info("prev_event_ids %s", prev_event_id_results)

for row in txn:
for row in prev_event_id_results:
if row[1] not in event_results:
queue.put((-row[0], row[1]))

Expand Down
110 changes: 110 additions & 0 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1504,6 +1504,10 @@ def _update_metadata_tables_txn(

self._handle_event_relations(txn, event)

self._handle_insertion_event(txn, event)
self._handle_marker_event(txn, event)
self._handle_chunk_id(txn, event)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the best place to limit who can add an insertion event or a chunk connection content field?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want to reject such events, so we probably want to add some stuff to the event authorization code.

Copy link
Contributor Author

@MadLittleMods MadLittleMods Jul 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am having trouble fitting this into the existing code.

  • I need to make sure that only application services can send insertion, marker, and events with chunk ID's
  • Other homeservers need to be able to accept these events over federation. But probably should only trust the originating homeserver where the room is. How does originating homeserver even work when future room ID's don't include the homeserver?
    • Maybe it should be some power level and we can specify the application service user ID with a high enough power level?

  • synapse/event_auth.py -> check(...):
    • It feels like the checks should go here next to check_redaction
    • That class doesn't have access to the store so I can't use store.get_app_service_by_user_id(...) to make sure the sender is an application service. Plus this code is probably run for events coming over federation which don't know which sender is an application service.
    • It could fit here if it was a new power level since we have the auth events
  • synapse/handlers/event_auth.py -> check_from_context(...)
    • I can use store.get_app_service_by_user_id(...) but other code is still only protected by the raw check(...)

def _check_if_allowed_to_send_historical_events
    def _check_if_allowed_to_send_historical_events(
        self,
        event: EventBase,
    ):
        """Check whether the event sender is allowed to add insertion, marker,
        or events with a chunk ID

        Raises:
            AuthError if the event sender is not an application service
        """

        if (
            event.type != EventTypes.MSC2716_INSERTION
            and event.type != EventTypes.MSC2716_MARKER
            and event.content.get(EventContentFields.MSC2716_CHUNK_ID) is None
        ):
            pass

        app_service = self._store.get_app_service_by_user_id(event.sender)

        if app_service is None:
            raise AuthError(403, "Only application services can send insertion events")

Copy link
Contributor Author

@MadLittleMods MadLittleMods Jul 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the m.room.power_levels -> events ("The level required to send specific event types. This is a mapping from event type to power level required.") field seems perfect 🎉

But we would probably want to default the power level for those events in existing rooms which don't have it set yet. Maybe default to only the creator or admins can do it if not explicitly set in power levels. Or keep it simple and not allow the history based events at all unless the power level was set. Is there any precedent for this? There is events_default but it's usually set low to allow any events.

The one problem I see is that there doesn't seem to be precedent for controlling a content field. The content.chunk_id field would also be on normal events from a non-application service user so I don't see a way to differentiate and auth it 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added unstable room version org.matrix.msc2716 which adds the historical power level that controls whether you can send insertion, chunk, and marker events ✅

I switched to chunk events so we can easily auth them against the PL level because they are sent by the application service user ID with the proper PL level for the room. See #10432


# Store the labels for this event.
labels = event.content.get(EventContentFields.LABELS)
if labels:
Expand Down Expand Up @@ -1756,6 +1760,112 @@ def _handle_event_relations(self, txn, event):
if rel_type == RelationTypes.REPLACE:
txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))

def _handle_insertion_event(self, txn, event):
"""Handles inserting insertion extremeties during peristence of marker events

Args:
txn
event (EventBase)
"""

if event.type != EventTypes.MSC2716_INSERTION:
# Not a insertion event
return

logger.info("_handle_insertion_event %s", event)

next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID)
if next_chunk_id is None:
# Invalid insertion event without next chunk ID
return

# Keep track of the insertion event and the chunk ID
self.db_pool.simple_insert_txn(
txn,
table="insertion_events",
values={
"insertion_event_id": event.event_id,
"room_id": event.room_id,
"next_chunk_id": next_chunk_id,
},
)

# Insert an edge for every prev_event connection
for prev_event_id in event.prev_events:
self.db_pool.simple_insert_txn(
txn,
table="insertion_event_edges",
values={
"insertion_event_id": event.event_id,
"room_id": event.room_id,
"insertion_prev_event_id": prev_event_id,
},
)

def _handle_marker_event(self, txn, event):
"""Handles inserting insertion extremeties during peristence of marker events

Args:
txn
event (EventBase)
"""

if event.type != EventTypes.MSC2716_MARKER:
# Not a marker event
return

logger.info("_handle_marker_event %s", event)

# TODO: We should attempt to backfill the insertion event instead
# of trying to pack all of the info in the marker event. Otherwise,
# we need to pack in the insertion_prev_events and insertion_next_chunk_id.

# insertion_event_id = event.content.get(
# EventContentFields.MSC2716_MARKER_INSERTION
# )
# insertion_prev_event_ids = event.content.get(
# EventContentFields.MSC2716_MARKER_INSERTION_PREV_EVENTS
# )
# if not insertion_event_id or not insertion_prev_event_ids:
# # Invalid marker event
# return

# for prev_event_id in insertion_prev_event_ids:
# self.db_pool.simple_insert_txn(
# txn,
# table="insertion_event_edges",
# values={
# "insertion_event_id": insertion_event_id,
# "room_id": event.room_id,
# "insertion_prev_event_id": prev_event_id,
# },
# )

def _handle_chunk_id(self, txn, event):
"""Handles inserting the chunk connections between the event at the
start of a chunk and an insertion event

Args: txn event (EventBase)
"""

chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID)
if chunk_id is None:
# No chunk connection to persist
return

logger.info("_handle_chunk_id %s %s", chunk_id, event)

# Keep track of the insertion event and the chunk ID
self.db_pool.simple_insert_txn(
txn,
table="chunk_edges",
values={
"event_id": event.event_id,
"room_id": event.room_id,
"chunk_id": chunk_id,
},
)

def _handle_redaction(self, txn, redacted_event_id):
"""Handles receiving a redaction and checking whether we need to remove
any redacted relations from the database.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Add a table that keeps track of "insertion" events back in the history
-- when we get a "marker" event over the "live" timeline. When navigating the DAG
-- and we hit an event which matches `insertion_prev_event_id`, it should backfill
-- the "insertion" event and start navigating from there.

CREATE TABLE IF NOT EXISTS insertion_events(
insertion_event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
next_chunk_id TEXT NOT NULL,
UNIQUE (insertion_event_id, room_id, next_chunk_id)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
);

CREATE INDEX IF NOT EXISTS insertion_events_insertion_room_id ON insertion_events(room_id);
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
CREATE INDEX IF NOT EXISTS insertion_events_insertion_event_id ON insertion_events(insertion_event_id);
CREATE INDEX IF NOT EXISTS insertion_events_next_chunk_id ON insertion_events(next_chunk_id);

CREATE TABLE IF NOT EXISTS insertion_event_edges(
insertion_event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
insertion_prev_event_id TEXT NOT NULL,
UNIQUE (insertion_event_id, room_id, insertion_prev_event_id)
);

CREATE INDEX IF NOT EXISTS insertion_event_edges_insertion_room_id ON insertion_event_edges(room_id);
CREATE INDEX IF NOT EXISTS insertion_event_edges_insertion_event_id ON insertion_event_edges(insertion_event_id);
CREATE INDEX IF NOT EXISTS insertion_event_edges_insertion_prev_event_id ON insertion_event_edges(insertion_prev_event_id);

CREATE TABLE IF NOT EXISTS chunk_edges(
event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
chunk_id TEXT NOT NULL,
UNIQUE (event_id, room_id)
);

CREATE INDEX IF NOT EXISTS chunk_edges_chunk_id ON chunk_edges(chunk_id);
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved