Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit e3a49f4

Browse files
authored
Fix missing sync events during historical batch imports (#12319)
Discovered after much in-depth investigation in #12281. Closes: #12281 Closes: #3305 Signed off by: Nick Mills-Barrett nick@beeper.com
1 parent d24cd17 commit e3a49f4

File tree

5 files changed

+162
-19
lines changed

5 files changed

+162
-19
lines changed

changelog.d/12319.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix bug with incremental sync missing events when rejoining/backfilling. Contributed by Nick @ Beeper.

synapse/handlers/message.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -175,17 +175,13 @@ async def get_state_events(
175175
state_filter = state_filter or StateFilter.all()
176176

177177
if at_token:
178-
# FIXME this claims to get the state at a stream position, but
179-
# get_recent_events_for_room operates by topo ordering. This therefore
180-
# does not reliably give you the state at the given stream position.
181-
# (https://github.com/matrix-org/synapse/issues/3305)
182-
last_events, _ = await self.store.get_recent_events_for_room(
183-
room_id, end_token=at_token.room_key, limit=1
178+
last_event = await self.store.get_last_event_in_room_before_stream_ordering(
179+
room_id,
180+
end_token=at_token.room_key,
184181
)
185182

186-
if not last_events:
183+
if not last_event:
187184
raise NotFoundError("Can't find event for token %s" % (at_token,))
188-
last_event = last_events[0]
189185

190186
# check whether the user is in the room at that time to determine
191187
# whether they should be treated as peeking.
@@ -204,7 +200,7 @@ async def get_state_events(
204200
visible_events = await filter_events_for_client(
205201
self.storage,
206202
user_id,
207-
last_events,
203+
[last_event],
208204
filter_send_to_client=False,
209205
is_peeking=is_peeking,
210206
)

synapse/handlers/sync.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -661,16 +661,15 @@ async def get_state_at(
661661
stream_position: point at which to get state
662662
state_filter: The state filter used to fetch state from the database.
663663
"""
664-
# FIXME this claims to get the state at a stream position, but
665-
# get_recent_events_for_room operates by topo ordering. This therefore
666-
# does not reliably give you the state at the given stream position.
667-
# (https://github.com/matrix-org/synapse/issues/3305)
668-
last_events, _ = await self.store.get_recent_events_for_room(
669-
room_id, end_token=stream_position.room_key, limit=1
664+
# FIXME: This gets the state at the latest event before the stream ordering,
665+
# which might not be the same as the "current state" of the room at the time
666+
# of the stream token if there were multiple forward extremities at the time.
667+
last_event = await self.store.get_last_event_in_room_before_stream_ordering(
668+
room_id,
669+
end_token=stream_position.room_key,
670670
)
671671

672-
if last_events:
673-
last_event = last_events[-1]
672+
if last_event:
674673
state = await self.get_state_after_event(
675674
last_event, state_filter=state_filter or StateFilter.all()
676675
)

synapse/storage/databases/main/stream.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,32 @@ def _f(txn: LoggingTransaction) -> Optional[Tuple[int, int, str]]:
758758
"get_room_event_before_stream_ordering", _f
759759
)
760760

761+
async def get_last_event_in_room_before_stream_ordering(
762+
self,
763+
room_id: str,
764+
end_token: RoomStreamToken,
765+
) -> Optional[EventBase]:
766+
"""Returns the last event in a room at or before a stream ordering
767+
768+
Args:
769+
room_id
770+
end_token: The token used to stream from
771+
772+
Returns:
773+
The most recent event.
774+
"""
775+
776+
last_row = await self.get_room_event_before_stream_ordering(
777+
room_id=room_id,
778+
stream_ordering=end_token.stream,
779+
)
780+
if last_row:
781+
_, _, event_id = last_row
782+
event = await self.get_event(event_id, get_prev_content=True)
783+
return event
784+
785+
return None
786+
761787
async def get_current_room_stream_token_for_room_id(
762788
self, room_id: Optional[str] = None
763789
) -> RoomStreamToken:

tests/rest/client/test_room_batch.py

Lines changed: 123 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
from synapse.api.constants import EventContentFields, EventTypes
88
from synapse.appservice import ApplicationService
99
from synapse.rest import admin
10-
from synapse.rest.client import login, register, room, room_batch
10+
from synapse.rest.client import login, register, room, room_batch, sync
1111
from synapse.server import HomeServer
12-
from synapse.types import JsonDict
12+
from synapse.types import JsonDict, RoomStreamToken
1313
from synapse.util import Clock
1414

1515
from tests import unittest
@@ -63,6 +63,7 @@ class RoomBatchTestCase(unittest.HomeserverTestCase):
6363
room.register_servlets,
6464
register.register_servlets,
6565
login.register_servlets,
66+
sync.register_servlets,
6667
]
6768

6869
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
@@ -178,3 +179,123 @@ def test_same_state_groups_for_whole_historical_batch(self) -> None:
178179
"Expected a single state_group to be returned by saw state_groups=%s"
179180
% (state_group_map.keys(),),
180181
)
182+
183+
@unittest.override_config({"experimental_features": {"msc2716_enabled": True}})
184+
def test_sync_while_batch_importing(self) -> None:
185+
"""
186+
Make sure that /sync correctly returns full room state when a user joins
187+
during ongoing batch backfilling.
188+
See: https://github.com/matrix-org/synapse/issues/12281
189+
"""
190+
# Create user who will be invited & join room
191+
user_id = self.register_user("beep", "test")
192+
user_tok = self.login("beep", "test")
193+
194+
time_before_room = int(self.clock.time_msec())
195+
196+
# Create a room with some events
197+
room_id, _, _, _ = self._create_test_room()
198+
# Invite the user
199+
self.helper.invite(
200+
room_id, src=self.appservice.sender, tok=self.appservice.token, targ=user_id
201+
)
202+
203+
# Create another room, send a bunch of events to advance the stream token
204+
other_room_id = self.helper.create_room_as(
205+
self.appservice.sender, tok=self.appservice.token
206+
)
207+
for _ in range(5):
208+
self.helper.send_event(
209+
room_id=other_room_id,
210+
type=EventTypes.Message,
211+
content={"msgtype": "m.text", "body": "C"},
212+
tok=self.appservice.token,
213+
)
214+
215+
# Join the room as the normal user
216+
self.helper.join(room_id, user_id, tok=user_tok)
217+
218+
# Create an event to hang the historical batch from - In order to see
219+
# the failure case originally reported in #12281, the historical batch
220+
# must be hung from the most recent event in the room so the base
221+
# insertion event ends up with the highest `topogological_ordering`
222+
# (`depth`) in the room but will have a negative `stream_ordering`
223+
# because it's a `historical` event. Previously, when assembling the
224+
# `state` for the `/sync` response, the bugged logic would sort by
225+
# `topological_ordering` descending and pick up the base insertion
226+
# event because it has a negative `stream_ordering` below the given
227+
# pagination token. Now we properly sort by `stream_ordering`
228+
# descending which puts `historical` events with a negative
229+
# `stream_ordering` way at the bottom and aren't selected as expected.
230+
response = self.helper.send_event(
231+
room_id=room_id,
232+
type=EventTypes.Message,
233+
content={
234+
"msgtype": "m.text",
235+
"body": "C",
236+
},
237+
tok=self.appservice.token,
238+
)
239+
event_to_hang_id = response["event_id"]
240+
241+
channel = self.make_request(
242+
"POST",
243+
"/_matrix/client/unstable/org.matrix.msc2716/rooms/%s/batch_send?prev_event_id=%s"
244+
% (room_id, event_to_hang_id),
245+
content={
246+
"events": _create_message_events_for_batch_send_request(
247+
self.virtual_user_id, time_before_room, 3
248+
),
249+
"state_events_at_start": _create_join_state_events_for_batch_send_request(
250+
[self.virtual_user_id], time_before_room
251+
),
252+
},
253+
access_token=self.appservice.token,
254+
)
255+
self.assertEqual(channel.code, 200, channel.result)
256+
257+
# Now we need to find the invite + join events stream tokens so we can sync between
258+
main_store = self.hs.get_datastores().main
259+
events, next_key = self.get_success(
260+
main_store.get_recent_events_for_room(
261+
room_id,
262+
50,
263+
end_token=main_store.get_room_max_token(),
264+
),
265+
)
266+
invite_event_position = None
267+
for event in events:
268+
if (
269+
event.type == "m.room.member"
270+
and event.content["membership"] == "invite"
271+
):
272+
invite_event_position = self.get_success(
273+
main_store.get_topological_token_for_event(event.event_id)
274+
)
275+
break
276+
277+
assert invite_event_position is not None, "No invite event found"
278+
279+
# Remove the topological order from the token by re-creating w/stream only
280+
invite_event_position = RoomStreamToken(None, invite_event_position.stream)
281+
282+
# Sync everything after this token
283+
since_token = self.get_success(invite_event_position.to_string(main_store))
284+
sync_response = self.make_request(
285+
"GET",
286+
f"/sync?since={since_token}",
287+
access_token=user_tok,
288+
)
289+
290+
# Assert that, for this room, the user was considered to have joined and thus
291+
# receives the full state history
292+
state_event_types = [
293+
event["type"]
294+
for event in sync_response.json_body["rooms"]["join"][room_id]["state"][
295+
"events"
296+
]
297+
]
298+
299+
assert (
300+
"m.room.create" in state_event_types
301+
), "Missing room full state in sync response"

0 commit comments

Comments
 (0)