Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit b2486f6

Browse files
authored
Fix message duplication if something goes wrong after persisting the event (#8476)
Should fix #3365.
1 parent a9a8f29 commit b2486f6

File tree

13 files changed

+481
-32
lines changed

13 files changed

+481
-32
lines changed

changelog.d/8476.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix message duplication if something goes wrong after persisting the event.

synapse/handlers/federation.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2966,17 +2966,20 @@ async def persist_events_and_notify(
29662966
return result["max_stream_id"]
29672967
else:
29682968
assert self.storage.persistence
2969-
max_stream_token = await self.storage.persistence.persist_events(
2969+
2970+
# Note that this returns the events that were persisted, which may not be
2971+
# the same as were passed in if some were deduplicated due to transaction IDs.
2972+
events, max_stream_token = await self.storage.persistence.persist_events(
29702973
event_and_contexts, backfilled=backfilled
29712974
)
29722975

29732976
if self._ephemeral_messages_enabled:
2974-
for (event, context) in event_and_contexts:
2977+
for event in events:
29752978
# If there's an expiry timestamp on the event, schedule its expiry.
29762979
self._message_handler.maybe_schedule_expiry(event)
29772980

29782981
if not backfilled: # Never notify for backfilled events
2979-
for event, _ in event_and_contexts:
2982+
for event in events:
29802983
await self._notify_persisted_event(event, max_stream_token)
29812984

29822985
return max_stream_token.stream

synapse/handlers/message.py

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -689,7 +689,7 @@ async def create_and_send_nonmember_event(
689689
send this event.
690690
691691
Returns:
692-
The event, and its stream ordering (if state event deduplication happened,
692+
The event, and its stream ordering (if deduplication happened,
693693
the previous, duplicate event).
694694
695695
Raises:
@@ -712,6 +712,19 @@ async def create_and_send_nonmember_event(
712712
# extremities to pile up, which in turn leads to state resolution
713713
# taking longer.
714714
with (await self.limiter.queue(event_dict["room_id"])):
715+
if txn_id and requester.access_token_id:
716+
existing_event_id = await self.store.get_event_id_from_transaction_id(
717+
event_dict["room_id"],
718+
requester.user.to_string(),
719+
requester.access_token_id,
720+
txn_id,
721+
)
722+
if existing_event_id:
723+
event = await self.store.get_event(existing_event_id)
724+
# we know it was persisted, so must have a stream ordering
725+
assert event.internal_metadata.stream_ordering
726+
return event, event.internal_metadata.stream_ordering
727+
715728
event, context = await self.create_event(
716729
requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id
717730
)
@@ -913,10 +926,20 @@ async def handle_new_client_event(
913926
extra_users=extra_users,
914927
)
915928
stream_id = result["stream_id"]
916-
event.internal_metadata.stream_ordering = stream_id
929+
event_id = result["event_id"]
930+
if event_id != event.event_id:
931+
# If we get a different event back then it means that its
932+
# been de-duplicated, so we replace the given event with the
933+
# one already persisted.
934+
event = await self.store.get_event(event_id)
935+
else:
936+
# If we newly persisted the event then we need to update its
937+
# stream_ordering entry manually (as it was persisted on
938+
# another worker).
939+
event.internal_metadata.stream_ordering = stream_id
917940
return event
918941

919-
stream_id = await self.persist_and_notify_client_event(
942+
event = await self.persist_and_notify_client_event(
920943
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
921944
)
922945

@@ -965,11 +988,16 @@ async def persist_and_notify_client_event(
965988
context: EventContext,
966989
ratelimit: bool = True,
967990
extra_users: List[UserID] = [],
968-
) -> int:
991+
) -> EventBase:
969992
"""Called when we have fully built the event, have already
970993
calculated the push actions for the event, and checked auth.
971994
972995
This should only be run on the instance in charge of persisting events.
996+
997+
Returns:
998+
The persisted event. This may be different than the given event if
999+
it was de-duplicated (e.g. because we had already persisted an
1000+
event with the same transaction ID.)
9731001
"""
9741002
assert self.storage.persistence is not None
9751003
assert self._events_shard_config.should_handle(
@@ -1137,9 +1165,13 @@ def is_inviter_member_event(e):
11371165
if prev_state_ids:
11381166
raise AuthError(403, "Changing the room create event is forbidden")
11391167

1140-
event_pos, max_stream_token = await self.storage.persistence.persist_event(
1141-
event, context=context
1142-
)
1168+
# Note that this returns the event that was persisted, which may not be
1169+
# the same as we passed in if it was deduplicated due transaction IDs.
1170+
(
1171+
event,
1172+
event_pos,
1173+
max_stream_token,
1174+
) = await self.storage.persistence.persist_event(event, context=context)
11431175

11441176
if self._ephemeral_events_enabled:
11451177
# If there's an expiry timestamp on the event, schedule its expiry.
@@ -1160,7 +1192,7 @@ def _notify():
11601192
# matters as sometimes presence code can take a while.
11611193
run_in_background(self._bump_active_time, requester.user)
11621194

1163-
return event_pos.stream
1195+
return event
11641196

11651197
async def _bump_active_time(self, user: UserID) -> None:
11661198
try:

synapse/handlers/room_member.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,17 @@ async def _local_membership_update(
171171
if requester.is_guest:
172172
content["kind"] = "guest"
173173

174+
# Check if we already have an event with a matching transaction ID. (We
175+
# do this check just before we persist an event as well, but may as well
176+
# do it up front for efficiency.)
177+
if txn_id and requester.access_token_id:
178+
existing_event_id = await self.store.get_event_id_from_transaction_id(
179+
room_id, requester.user.to_string(), requester.access_token_id, txn_id,
180+
)
181+
if existing_event_id:
182+
event_pos = await self.store.get_position_for_event(existing_event_id)
183+
return existing_event_id, event_pos.stream
184+
174185
event, context = await self.event_creation_handler.create_event(
175186
requester,
176187
{
@@ -679,7 +690,7 @@ async def send_membership_event(
679690
if is_blocked:
680691
raise SynapseError(403, "This room has been blocked on this server")
681692

682-
await self.event_creation_handler.handle_new_client_event(
693+
event = await self.event_creation_handler.handle_new_client_event(
683694
requester, event, context, extra_users=[target_user], ratelimit=ratelimit
684695
)
685696

synapse/replication/http/send_event.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
4646
"ratelimit": true,
4747
"extra_users": [],
4848
}
49+
50+
200 OK
51+
52+
{ "stream_id": 12345, "event_id": "$abcdef..." }
53+
54+
The returned event ID may not match the sent event if it was deduplicated.
4955
"""
5056

5157
NAME = "send_event"
@@ -116,11 +122,17 @@ async def _handle_request(self, request, event_id):
116122
"Got event to send with ID: %s into room: %s", event.event_id, event.room_id
117123
)
118124

119-
stream_id = await self.event_creation_handler.persist_and_notify_client_event(
125+
event = await self.event_creation_handler.persist_and_notify_client_event(
120126
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
121127
)
122128

123-
return 200, {"stream_id": stream_id}
129+
return (
130+
200,
131+
{
132+
"stream_id": event.internal_metadata.stream_ordering,
133+
"event_id": event.event_id,
134+
},
135+
)
124136

125137

126138
def register_servlets(hs, http_server):

synapse/storage/databases/main/events.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,8 @@ def _persist_events_txn(
361361

362362
self._store_event_txn(txn, events_and_contexts=events_and_contexts)
363363

364+
self._persist_transaction_ids_txn(txn, events_and_contexts)
365+
364366
# Insert into event_to_state_groups.
365367
self._store_event_state_mappings_txn(txn, events_and_contexts)
366368

@@ -405,6 +407,35 @@ def _persist_events_txn(
405407
# room_memberships, where applicable.
406408
self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
407409

410+
def _persist_transaction_ids_txn(
411+
self,
412+
txn: LoggingTransaction,
413+
events_and_contexts: List[Tuple[EventBase, EventContext]],
414+
):
415+
"""Persist the mapping from transaction IDs to event IDs (if defined).
416+
"""
417+
418+
to_insert = []
419+
for event, _ in events_and_contexts:
420+
token_id = getattr(event.internal_metadata, "token_id", None)
421+
txn_id = getattr(event.internal_metadata, "txn_id", None)
422+
if token_id and txn_id:
423+
to_insert.append(
424+
{
425+
"event_id": event.event_id,
426+
"room_id": event.room_id,
427+
"user_id": event.sender,
428+
"token_id": token_id,
429+
"txn_id": txn_id,
430+
"inserted_ts": self._clock.time_msec(),
431+
}
432+
)
433+
434+
if to_insert:
435+
self.db_pool.simple_insert_many_txn(
436+
txn, table="event_txn_id", values=to_insert,
437+
)
438+
408439
def _update_current_state_txn(
409440
self,
410441
txn: LoggingTransaction,

synapse/storage/databases/main/events_worker.py

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
15-
1615
import itertools
1716
import logging
1817
import threading
@@ -137,6 +136,15 @@ def __init__(self, database: DatabasePool, db_conn, hs):
137136
db_conn, "events", "stream_ordering", step=-1
138137
)
139138

139+
if not hs.config.worker.worker_app:
140+
# We periodically clean out old transaction ID mappings
141+
self._clock.looping_call(
142+
run_as_background_process,
143+
5 * 60 * 1000,
144+
"_cleanup_old_transaction_ids",
145+
self._cleanup_old_transaction_ids,
146+
)
147+
140148
self._get_event_cache = Cache(
141149
"*getEvent*",
142150
keylen=3,
@@ -1308,3 +1316,76 @@ def get_next_event_to_expire_txn(txn):
13081316
return await self.db_pool.runInteraction(
13091317
desc="get_next_event_to_expire", func=get_next_event_to_expire_txn
13101318
)
1319+
1320+
async def get_event_id_from_transaction_id(
1321+
self, room_id: str, user_id: str, token_id: int, txn_id: str
1322+
) -> Optional[str]:
1323+
"""Look up if we have already persisted an event for the transaction ID,
1324+
returning the event ID if so.
1325+
"""
1326+
return await self.db_pool.simple_select_one_onecol(
1327+
table="event_txn_id",
1328+
keyvalues={
1329+
"room_id": room_id,
1330+
"user_id": user_id,
1331+
"token_id": token_id,
1332+
"txn_id": txn_id,
1333+
},
1334+
retcol="event_id",
1335+
allow_none=True,
1336+
desc="get_event_id_from_transaction_id",
1337+
)
1338+
1339+
async def get_already_persisted_events(
1340+
self, events: Iterable[EventBase]
1341+
) -> Dict[str, str]:
1342+
"""Look up if we have already persisted an event for the transaction ID,
1343+
returning a mapping from event ID in the given list to the event ID of
1344+
an existing event.
1345+
1346+
Also checks if there are duplicates in the given events, if there are
1347+
will map duplicates to the *first* event.
1348+
"""
1349+
1350+
mapping = {}
1351+
txn_id_to_event = {} # type: Dict[Tuple[str, int, str], str]
1352+
1353+
for event in events:
1354+
token_id = getattr(event.internal_metadata, "token_id", None)
1355+
txn_id = getattr(event.internal_metadata, "txn_id", None)
1356+
1357+
if token_id and txn_id:
1358+
# Check if this is a duplicate of an event in the given events.
1359+
existing = txn_id_to_event.get((event.room_id, token_id, txn_id))
1360+
if existing:
1361+
mapping[event.event_id] = existing
1362+
continue
1363+
1364+
# Check if this is a duplicate of an event we've already
1365+
# persisted.
1366+
existing = await self.get_event_id_from_transaction_id(
1367+
event.room_id, event.sender, token_id, txn_id
1368+
)
1369+
if existing:
1370+
mapping[event.event_id] = existing
1371+
txn_id_to_event[(event.room_id, token_id, txn_id)] = existing
1372+
else:
1373+
txn_id_to_event[(event.room_id, token_id, txn_id)] = event.event_id
1374+
1375+
return mapping
1376+
1377+
async def _cleanup_old_transaction_ids(self):
1378+
"""Cleans out transaction id mappings older than 24hrs.
1379+
"""
1380+
1381+
def _cleanup_old_transaction_ids_txn(txn):
1382+
sql = """
1383+
DELETE FROM event_txn_id
1384+
WHERE inserted_ts < ?
1385+
"""
1386+
one_day_ago = self._clock.time_msec() - 24 * 60 * 60 * 1000
1387+
txn.execute(sql, (one_day_ago,))
1388+
1389+
return await self.db_pool.runInteraction(
1390+
"_cleanup_old_transaction_ids", _cleanup_old_transaction_ids_txn,
1391+
)

synapse/storage/databases/main/registration.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1003,7 +1003,7 @@ async def add_access_token_to_user(
10031003
token: str,
10041004
device_id: Optional[str],
10051005
valid_until_ms: Optional[int],
1006-
) -> None:
1006+
) -> int:
10071007
"""Adds an access token for the given user.
10081008
10091009
Args:
@@ -1013,6 +1013,8 @@ async def add_access_token_to_user(
10131013
valid_until_ms: when the token is valid until. None for no expiry.
10141014
Raises:
10151015
StoreError if there was a problem adding this.
1016+
Returns:
1017+
The token ID
10161018
"""
10171019
next_id = self._access_tokens_id_gen.get_next()
10181020

@@ -1028,6 +1030,8 @@ async def add_access_token_to_user(
10281030
desc="add_access_token_to_user",
10291031
)
10301032

1033+
return next_id
1034+
10311035
def _set_device_for_access_token_txn(self, txn, token: str, device_id: str) -> str:
10321036
old_device_id = self.db_pool.simple_select_one_onecol_txn(
10331037
txn, "access_tokens", {"token": token}, "device_id"
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/* Copyright 2020 The Matrix.org Foundation C.I.C
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
17+
-- A map of recent events persisted with transaction IDs. Used to deduplicate
18+
-- send event requests with the same transaction ID.
19+
--
20+
-- Note: transaction IDs are scoped to the room ID/user ID/access token that was
21+
-- used to make the request.
22+
--
23+
-- Note: The foreign key constraints are ON DELETE CASCADE, as if we delete the
24+
-- events or access token we don't want to try and de-duplicate the event.
25+
CREATE TABLE IF NOT EXISTS event_txn_id (
26+
event_id TEXT NOT NULL,
27+
room_id TEXT NOT NULL,
28+
user_id TEXT NOT NULL,
29+
token_id BIGINT NOT NULL,
30+
txn_id TEXT NOT NULL,
31+
inserted_ts BIGINT NOT NULL,
32+
FOREIGN KEY (event_id)
33+
REFERENCES events (event_id) ON DELETE CASCADE,
34+
FOREIGN KEY (token_id)
35+
REFERENCES access_tokens (id) ON DELETE CASCADE
36+
);
37+
38+
CREATE UNIQUE INDEX IF NOT EXISTS event_txn_id_event_id ON event_txn_id(event_id);
39+
CREATE UNIQUE INDEX IF NOT EXISTS event_txn_id_txn_id ON event_txn_id(room_id, user_id, token_id, txn_id);
40+
CREATE INDEX IF NOT EXISTS event_txn_id_ts ON event_txn_id(inserted_ts);

0 commit comments

Comments
 (0)