Skip to content

Commit 8c58eb7

Browse files
Add event.internal_metadata.instance_name (#17300)
Add `event.internal_metadata.instance_name` (the worker instance that persisted the event) to go alongside the existing `event.internal_metadata.stream_ordering`. `instance_name` is useful to properly compare and query for events with a token since you need to compare both the `stream_ordering` and `instance_name` against the vector clock/`instance_map` in the `RoomStreamToken`. This is pre-requisite work and may be used in #17293 Adding `event.internal_metadata.instance_name` was first mentioned in the initial Sliding Sync PR while pairing with @erikjohnston, see 09609cb#diff-5cd773fb307aa754bd3948871ba118b1ef0303f4d72d42a2d21e38242bf4e096R405-R410
1 parent ebdce69 commit 8c58eb7

File tree

10 files changed

+31
-9
lines changed

10 files changed

+31
-9
lines changed

changelog.d/17300.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Expose the worker instance that persisted the event on `event.internal_metadata.instance_name`.

rust/src/events/internal_metadata.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,8 @@ pub struct EventInternalMetadata {
204204
/// The stream ordering of this event. None, until it has been persisted.
205205
#[pyo3(get, set)]
206206
stream_ordering: Option<NonZeroI64>,
207+
#[pyo3(get, set)]
208+
instance_name: Option<String>,
207209

208210
/// whether this event is an outlier (ie, whether we have the state at that
209211
/// point in the DAG)
@@ -232,6 +234,7 @@ impl EventInternalMetadata {
232234
Ok(EventInternalMetadata {
233235
data,
234236
stream_ordering: None,
237+
instance_name: None,
235238
outlier: false,
236239
})
237240
}

synapse/events/utils.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ def prune_event(event: EventBase) -> EventBase:
9090
pruned_event.internal_metadata.stream_ordering = (
9191
event.internal_metadata.stream_ordering
9292
)
93+
pruned_event.internal_metadata.instance_name = event.internal_metadata.instance_name
9394
pruned_event.internal_metadata.outlier = event.internal_metadata.outlier
9495

9596
# Mark the event as redacted
@@ -116,6 +117,7 @@ def clone_event(event: EventBase) -> EventBase:
116117
new_event.internal_metadata.stream_ordering = (
117118
event.internal_metadata.stream_ordering
118119
)
120+
new_event.internal_metadata.instance_name = event.internal_metadata.instance_name
119121
new_event.internal_metadata.outlier = event.internal_metadata.outlier
120122

121123
return new_event

synapse/handlers/message.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1551,6 +1551,7 @@ async def _persist_events(
15511551
# stream_ordering entry manually (as it was persisted on
15521552
# another worker).
15531553
event.internal_metadata.stream_ordering = stream_id
1554+
event.internal_metadata.instance_name = writer_instance
15541555

15551556
return event
15561557

synapse/storage/databases/main/events.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ async def _persist_events_and_state_updates(
207207
async with stream_ordering_manager as stream_orderings:
208208
for (event, _), stream in zip(events_and_contexts, stream_orderings):
209209
event.internal_metadata.stream_ordering = stream
210+
event.internal_metadata.instance_name = self._instance_name
210211

211212
await self.db_pool.runInteraction(
212213
"persist_events",

synapse/storage/databases/main/events_worker.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ class _EventRow:
156156

157157
event_id: str
158158
stream_ordering: int
159+
instance_name: str
159160
json: str
160161
internal_metadata: str
161162
format_version: Optional[int]
@@ -1354,6 +1355,7 @@ async def _fetch_event_ids_and_get_outstanding_redactions(
13541355
rejected_reason=rejected_reason,
13551356
)
13561357
original_ev.internal_metadata.stream_ordering = row.stream_ordering
1358+
original_ev.internal_metadata.instance_name = row.instance_name
13571359
original_ev.internal_metadata.outlier = row.outlier
13581360

13591361
# Consistency check: if the content of the event has been modified in the
@@ -1439,6 +1441,7 @@ def _fetch_event_rows(
14391441
SELECT
14401442
e.event_id,
14411443
e.stream_ordering,
1444+
e.instance_name,
14421445
ej.internal_metadata,
14431446
ej.json,
14441447
ej.format_version,
@@ -1462,13 +1465,14 @@ def _fetch_event_rows(
14621465
event_dict[event_id] = _EventRow(
14631466
event_id=event_id,
14641467
stream_ordering=row[1],
1465-
internal_metadata=row[2],
1466-
json=row[3],
1467-
format_version=row[4],
1468-
room_version_id=row[5],
1469-
rejected_reason=row[6],
1468+
instance_name=row[2],
1469+
internal_metadata=row[3],
1470+
json=row[4],
1471+
format_version=row[5],
1472+
room_version_id=row[6],
1473+
rejected_reason=row[7],
14701474
redactions=[],
1471-
outlier=bool(row[7]), # This is an int in SQLite3
1475+
outlier=bool(row[8]), # This is an int in SQLite3
14721476
)
14731477

14741478
# check for redactions

synapse/synapse_rust/events.pyi

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ class EventInternalMetadata:
1919

2020
stream_ordering: Optional[int]
2121
"""the stream ordering of this event. None, until it has been persisted."""
22+
instance_name: Optional[str]
23+
"""the instance name of the server that persisted this event. None, until it has been persisted."""
2224

2325
outlier: bool
2426
"""whether this event is an outlier (ie, whether we have the state at that

tests/events/test_utils.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,13 +625,16 @@ def test_unsigned_is_copied(self) -> None:
625625
)
626626
original.internal_metadata.stream_ordering = 1234
627627
self.assertEqual(original.internal_metadata.stream_ordering, 1234)
628+
original.internal_metadata.instance_name = "worker1"
629+
self.assertEqual(original.internal_metadata.instance_name, "worker1")
628630

629631
cloned = clone_event(original)
630632
cloned.unsigned["b"] = 3
631633

632634
self.assertEqual(original.unsigned, {"a": 1, "b": 2})
633635
self.assertEqual(cloned.unsigned, {"a": 1, "b": 3})
634636
self.assertEqual(cloned.internal_metadata.stream_ordering, 1234)
637+
self.assertEqual(cloned.internal_metadata.instance_name, "worker1")
635638
self.assertEqual(cloned.internal_metadata.txn_id, "txn")
636639

637640

tests/replication/storage/test_events.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ def test_invites(self) -> None:
141141
self.persist(type="m.room.create", key="", creator=USER_ID)
142142
self.check("get_invited_rooms_for_local_user", [USER_ID_2], [])
143143
event = self.persist(type="m.room.member", key=USER_ID_2, membership="invite")
144+
assert event.internal_metadata.instance_name is not None
144145
assert event.internal_metadata.stream_ordering is not None
145146

146147
self.replicate()
@@ -155,7 +156,7 @@ def test_invites(self) -> None:
155156
"invite",
156157
event.event_id,
157158
PersistedEventPosition(
158-
self.hs.get_instance_name(),
159+
event.internal_metadata.instance_name,
159160
event.internal_metadata.stream_ordering,
160161
),
161162
RoomVersions.V1.identifier,
@@ -232,11 +233,12 @@ def test_get_rooms_for_user_with_stream_ordering(self) -> None:
232233
j2 = self.persist(
233234
type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
234235
)
236+
assert j2.internal_metadata.instance_name is not None
235237
assert j2.internal_metadata.stream_ordering is not None
236238
self.replicate()
237239

238240
expected_pos = PersistedEventPosition(
239-
"master", j2.internal_metadata.stream_ordering
241+
j2.internal_metadata.instance_name, j2.internal_metadata.stream_ordering
240242
)
241243
self.check(
242244
"get_rooms_for_user_with_stream_ordering",
@@ -288,6 +290,7 @@ def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist(
288290
msg, msgctx = self.build_event()
289291
self.get_success(self.persistance.persist_events([(j2, j2ctx), (msg, msgctx)]))
290292
self.replicate()
293+
assert j2.internal_metadata.instance_name is not None
291294
assert j2.internal_metadata.stream_ordering is not None
292295

293296
event_source = RoomEventSource(self.hs)
@@ -329,7 +332,8 @@ def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist(
329332
# joined_rooms list.
330333
if membership_changes:
331334
expected_pos = PersistedEventPosition(
332-
"master", j2.internal_metadata.stream_ordering
335+
j2.internal_metadata.instance_name,
336+
j2.internal_metadata.stream_ordering,
333337
)
334338
self.assertEqual(
335339
joined_rooms,

tests/storage/test_event_chain.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,7 @@ def persist(
431431

432432
for e in events:
433433
e.internal_metadata.stream_ordering = self._next_stream_ordering
434+
e.internal_metadata.instance_name = self.hs.get_instance_name()
434435
self._next_stream_ordering += 1
435436

436437
def _persist(txn: LoggingTransaction) -> None:

0 commit comments

Comments
 (0)