Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 35d6b91

Browse files
Resolve and share state_groups for all historical events in batch (MSC2716) (#10975)
Resolve and share `state_groups` for all historical events in batch. This also helps for showing the appropriate avatar/displayname in Element and will work whenever `/messages` has one of the historical messages as the first message in the batch. This does have the flaw where if you just insert a single historical event somewhere, it probably won't resolve the state correctly from `/messages` or `/context` since it will grab a non historical event above or below with resolved state which never included the historical state back then. For the same reasions, this also does not work in Element between the transition from actual messages to historical messages. In the Gitter case, this isn't really a problem since all of the historical messages are in one big lump at the beginning of the room. For a future iteration, might be good to look at `/messages` and `/context` to additionally add the `state` for any historical messages in that batch. --- How are the `state_groups` shared? To illustrate the `state_group` sharing, see this example: **Before** (new `state_group` for every event 😬, very inefficient): ``` # Tests from matrix-org/complement#206 $ COMPLEMENT_ALWAYS_PRINT_SERVER_LOGS=1 COMPLEMENT_DIR=../complement ./scripts-dev/complement.sh TestBackfillingHistory/parallel/should_resolve_member_state_events_for_historical_events create_new_client_event m.room.member event=$_JXfwUDIWS6xKGG4SmZXjSFrizhARM7QblhATVWWUcA state_group=None create_new_client_event org.matrix.msc2716.insertion event=$1ZBfmBKEjg94d-vGYymKrVYeghwBOuGJ3wubU1-I9y0 state_group=9 create_new_client_event org.matrix.msc2716.insertion event=$Mq2JvRetTyclPuozRI682SAjYp3GqRuPc8_cH5-ezPY state_group=10 create_new_client_event m.room.message event=$MfmY4rBQkxrIp8jVwVMTJ4PKnxSigpG9E2cn7S0AtTo state_group=11 create_new_client_event m.room.message event=$uYOv6V8wiF7xHwOMt-60d1AoOIbqLgrDLz6ZIQDdWUI state_group=12 create_new_client_event m.room.message event=$PAbkJRMxb0bX4A6av463faiAhxkE3FEObM1xB4D0UG4 state_group=13 create_new_client_event org.matrix.msc2716.batch event=$Oy_S7AWN7rJQe_MYwGPEy6RtbYklrI-tAhmfiLrCaKI state_group=14 ``` **After** (all events in batch sharing `state_group=10`) (the base insertion event has `state_group=8` which matches the `prev_event` we're inserting next to): ``` # Tests from matrix-org/complement#206 $ COMPLEMENT_ALWAYS_PRINT_SERVER_LOGS=1 COMPLEMENT_DIR=../complement ./scripts-dev/complement.sh TestBackfillingHistory/parallel/should_resolve_member_state_events_for_historical_events create_new_client_event m.room.member event=$PWomJ8PwENYEYuVNoG30gqtybuQQSZ55eldBUSs0i0U state_group=None create_new_client_event org.matrix.msc2716.insertion event=$e_mCU7Eah9ABF6nQU7lu4E1RxIWccNF05AKaTT5m3lw state_group=9 create_new_client_event org.matrix.msc2716.insertion event=$ui7A3_GdXIcJq0C8GpyrF8X7B3DTjMd_WGCjogax7xU state_group=10 create_new_client_event m.room.message event=$EnTIM5rEGVezQJiYl62uFBl6kJ7B-sMxWqe2D_4FX1I state_group=10 create_new_client_event m.room.message event=$LGx5jGONnBPuNhAuZqHeEoXChd9ryVkuTZatGisOPjk state_group=10 create_new_client_event m.room.message event=$wW0zwoN50lbLu1KoKbybVMxLbKUj7GV_olozIc5i3M0 state_group=10 create_new_client_event org.matrix.msc2716.batch event=$5ZB6dtzqFBCEuMRgpkU201Qhx3WtXZGTz_YgldL6JrQ state_group=10 ```
1 parent 4044442 commit 35d6b91

File tree

8 files changed

+114
-47
lines changed

8 files changed

+114
-47
lines changed

changelog.d/10975.feature

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Resolve and share `state_groups` for all [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) historical events in batch.

synapse/handlers/message.py

+34-23
Original file line numberDiff line numberDiff line change
@@ -607,29 +607,6 @@ async def create_event(
607607

608608
builder.internal_metadata.historical = historical
609609

610-
# Strip down the auth_event_ids to only what we need to auth the event.
611-
# For example, we don't need extra m.room.member that don't match event.sender
612-
if auth_event_ids is not None:
613-
# If auth events are provided, prev events must be also.
614-
assert prev_event_ids is not None
615-
616-
temp_event = await builder.build(
617-
prev_event_ids=prev_event_ids,
618-
auth_event_ids=auth_event_ids,
619-
depth=depth,
620-
)
621-
auth_events = await self.store.get_events_as_list(auth_event_ids)
622-
# Create a StateMap[str]
623-
auth_event_state_map = {
624-
(e.type, e.state_key): e.event_id for e in auth_events
625-
}
626-
# Actually strip down and use the necessary auth events
627-
auth_event_ids = self._event_auth_handler.compute_auth_events(
628-
event=temp_event,
629-
current_state_ids=auth_event_state_map,
630-
for_verification=False,
631-
)
632-
633610
event, context = await self.create_new_client_event(
634611
builder=builder,
635612
requester=requester,
@@ -936,6 +913,33 @@ async def create_new_client_event(
936913
Tuple of created event, context
937914
"""
938915

916+
# Strip down the auth_event_ids to only what we need to auth the event.
917+
# For example, we don't need extra m.room.member that don't match event.sender
918+
full_state_ids_at_event = None
919+
if auth_event_ids is not None:
920+
# If auth events are provided, prev events must be also.
921+
assert prev_event_ids is not None
922+
923+
# Copy the full auth state before it stripped down
924+
full_state_ids_at_event = auth_event_ids.copy()
925+
926+
temp_event = await builder.build(
927+
prev_event_ids=prev_event_ids,
928+
auth_event_ids=auth_event_ids,
929+
depth=depth,
930+
)
931+
auth_events = await self.store.get_events_as_list(auth_event_ids)
932+
# Create a StateMap[str]
933+
auth_event_state_map = {
934+
(e.type, e.state_key): e.event_id for e in auth_events
935+
}
936+
# Actually strip down and use the necessary auth events
937+
auth_event_ids = self._event_auth_handler.compute_auth_events(
938+
event=temp_event,
939+
current_state_ids=auth_event_state_map,
940+
for_verification=False,
941+
)
942+
939943
if prev_event_ids is not None:
940944
assert (
941945
len(prev_event_ids) <= 10
@@ -965,6 +969,13 @@ async def create_new_client_event(
965969
if builder.internal_metadata.outlier:
966970
event.internal_metadata.outlier = True
967971
context = EventContext.for_outlier()
972+
elif (
973+
event.type == EventTypes.MSC2716_INSERTION
974+
and full_state_ids_at_event
975+
and builder.internal_metadata.is_historical()
976+
):
977+
old_state = await self.store.get_events_as_list(full_state_ids_at_event)
978+
context = await self.state.compute_event_context(event, old_state=old_state)
968979
else:
969980
context = await self.state.compute_event_context(event)
970981

synapse/handlers/room_batch.py

+30-10
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313
logger = logging.getLogger(__name__)
1414

1515

16+
def generate_fake_event_id() -> str:
17+
return "$fake_" + random_string(43)
18+
19+
1620
class RoomBatchHandler:
1721
def __init__(self, hs: "HomeServer"):
1822
self.hs = hs
@@ -177,6 +181,11 @@ async def persist_state_events_at_start(
177181

178182
state_event_ids_at_start = []
179183
auth_event_ids = initial_auth_event_ids.copy()
184+
185+
# Make the state events float off on their own so we don't have a
186+
# bunch of `@mxid joined the room` noise between each batch
187+
prev_event_id_for_state_chain = generate_fake_event_id()
188+
180189
for state_event in state_events_at_start:
181190
assert_params_in_dict(
182191
state_event, ["type", "origin_server_ts", "content", "sender"]
@@ -200,10 +209,6 @@ async def persist_state_events_at_start(
200209
# Mark all events as historical
201210
event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True
202211

203-
# Make the state events float off on their own so we don't have a
204-
# bunch of `@mxid joined the room` noise between each batch
205-
fake_prev_event_id = "$" + random_string(43)
206-
207212
# TODO: This is pretty much the same as some other code to handle inserting state in this file
208213
if event_dict["type"] == EventTypes.Member:
209214
membership = event_dict["content"].get("membership", None)
@@ -216,7 +221,7 @@ async def persist_state_events_at_start(
216221
action=membership,
217222
content=event_dict["content"],
218223
outlier=True,
219-
prev_event_ids=[fake_prev_event_id],
224+
prev_event_ids=[prev_event_id_for_state_chain],
220225
# Make sure to use a copy of this list because we modify it
221226
# later in the loop here. Otherwise it will be the same
222227
# reference and also update in the event when we append later.
@@ -235,7 +240,7 @@ async def persist_state_events_at_start(
235240
),
236241
event_dict,
237242
outlier=True,
238-
prev_event_ids=[fake_prev_event_id],
243+
prev_event_ids=[prev_event_id_for_state_chain],
239244
# Make sure to use a copy of this list because we modify it
240245
# later in the loop here. Otherwise it will be the same
241246
# reference and also update in the event when we append later.
@@ -245,6 +250,8 @@ async def persist_state_events_at_start(
245250

246251
state_event_ids_at_start.append(event_id)
247252
auth_event_ids.append(event_id)
253+
# Connect all the state in a floating chain
254+
prev_event_id_for_state_chain = event_id
248255

249256
return state_event_ids_at_start
250257

@@ -289,6 +296,10 @@ async def persist_historical_events(
289296
for ev in events_to_create:
290297
assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"])
291298

299+
assert self.hs.is_mine_id(ev["sender"]), "User must be our own: %s" % (
300+
ev["sender"],
301+
)
302+
292303
event_dict = {
293304
"type": ev["type"],
294305
"origin_server_ts": ev["origin_server_ts"],
@@ -311,17 +322,26 @@ async def persist_historical_events(
311322
historical=True,
312323
depth=inherited_depth,
313324
)
325+
326+
assert context._state_group
327+
328+
# Normally this is done when persisting the event but we have to
329+
# pre-emptively do it here because we create all the events first,
330+
# then persist them in another pass below. And we want to share
331+
# state_groups across the whole batch so this lookup needs to work
332+
# for the next event in the batch in this loop.
333+
await self.store.store_state_group_id_for_event_id(
334+
event_id=event.event_id,
335+
state_group_id=context._state_group,
336+
)
337+
314338
logger.debug(
315339
"RoomBatchSendEventRestServlet inserting event=%s, prev_event_ids=%s, auth_event_ids=%s",
316340
event,
317341
prev_event_ids,
318342
auth_event_ids,
319343
)
320344

321-
assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
322-
event.sender,
323-
)
324-
325345
events_to_persist.append((event, context))
326346
event_id = event.event_id
327347

synapse/rest/client/room_batch.py

+6-9
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
from synapse.http.site import SynapseRequest
3333
from synapse.rest.client.transactions import HttpTransactionCache
3434
from synapse.types import JsonDict
35-
from synapse.util.stringutils import random_string
3635

3736
if TYPE_CHECKING:
3837
from synapse.server import HomeServer
@@ -160,11 +159,6 @@ async def on_POST(
160159
base_insertion_event = None
161160
if batch_id_from_query:
162161
batch_id_to_connect_to = batch_id_from_query
163-
# All but the first base insertion event should point at a fake
164-
# event, which causes the HS to ask for the state at the start of
165-
# the batch later.
166-
fake_prev_event_id = "$" + random_string(43)
167-
prev_event_ids = [fake_prev_event_id]
168162
# Otherwise, create an insertion event to act as a starting point.
169163
#
170164
# We don't always have an insertion event to start hanging more history
@@ -173,16 +167,14 @@ async def on_POST(
173167
# an insertion event), in which case we just create a new insertion event
174168
# that can then get pointed to by a "marker" event later.
175169
else:
176-
prev_event_ids = prev_event_ids_from_query
177-
178170
base_insertion_event_dict = (
179171
self.room_batch_handler.create_insertion_event_dict(
180172
sender=requester.user.to_string(),
181173
room_id=room_id,
182174
origin_server_ts=last_event_in_batch["origin_server_ts"],
183175
)
184176
)
185-
base_insertion_event_dict["prev_events"] = prev_event_ids.copy()
177+
base_insertion_event_dict["prev_events"] = prev_event_ids_from_query.copy()
186178

187179
(
188180
base_insertion_event,
@@ -203,6 +195,11 @@ async def on_POST(
203195
EventContentFields.MSC2716_NEXT_BATCH_ID
204196
]
205197

198+
# Also connect the historical event chain to the end of the floating
199+
# state chain, which causes the HS to ask for the state at the start of
200+
# the batch later.
201+
prev_event_ids = [state_event_ids_at_start[-1]]
202+
206203
# Create and persist all of the historical events as well as insertion
207204
# and batch meta events to make the batch navigable in the DAG.
208205
event_ids, next_batch_id = await self.room_batch_handler.handle_batch_of_events(

synapse/storage/databases/main/events.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -2069,12 +2069,14 @@ def _store_event_state_mappings_txn(
20692069

20702070
state_groups[event.event_id] = context.state_group
20712071

2072-
self.db_pool.simple_insert_many_txn(
2072+
self.db_pool.simple_upsert_many_txn(
20732073
txn,
20742074
table="event_to_state_groups",
2075-
values=[
2076-
{"state_group": state_group_id, "event_id": event_id}
2077-
for event_id, state_group_id in state_groups.items()
2075+
key_names=["event_id"],
2076+
key_values=[[event_id] for event_id, _ in state_groups.items()],
2077+
value_names=["state_group"],
2078+
value_values=[
2079+
[state_group_id] for _, state_group_id in state_groups.items()
20782080
],
20792081
)
20802082

synapse/storage/databases/main/room_batch.py

+13
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,16 @@ async def get_insertion_event_by_batch_id(
3636
retcol="event_id",
3737
allow_none=True,
3838
)
39+
40+
async def store_state_group_id_for_event_id(
41+
self, event_id: str, state_group_id: int
42+
) -> Optional[str]:
43+
{
44+
await self.db_pool.simple_upsert(
45+
table="event_to_state_groups",
46+
keyvalues={"event_id": event_id},
47+
values={"state_group": state_group_id, "event_id": event_id},
48+
# Unique constraint on event_id so we don't have to lock
49+
lock=False,
50+
)
51+
}

synapse/storage/schema/__init__.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
SCHEMA_VERSION = 64 # remember to update the list below when updating
15+
SCHEMA_VERSION = 65 # remember to update the list below when updating
1616
"""Represents the expectations made by the codebase about the database schema
1717
1818
This should be incremented whenever the codebase changes its requirements on the
@@ -41,6 +41,10 @@
4141
4242
Changes in SCHEMA_VERSION = 64:
4343
- MSC2716: Rename related tables and columns from "chunks" to "batches".
44+
45+
Changes in SCHEMA_VERSION = 65:
46+
- MSC2716: Remove unique event_id constraint from insertion_event_edges
47+
because an insertion event can have multiple edges.
4448
"""
4549

4650

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/* Copyright 2021 The Matrix.org Foundation C.I.C
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
-- Recreate the insertion_event_edges event_id index without the unique constraint
17+
-- because an insertion event can have multiple edges.
18+
DROP INDEX insertion_event_edges_event_id;
19+
CREATE INDEX IF NOT EXISTS insertion_event_edges_event_id ON insertion_event_edges(event_id);

0 commit comments

Comments
 (0)