4949 PersistedEventPosition ,
5050 Requester ,
5151 RoomStreamToken ,
52+ SlidingSyncStreamToken ,
5253 StateMap ,
5354 StreamKeyType ,
5455 StreamToken ,
@@ -362,7 +363,7 @@ async def wait_for_sync_for_user(
362363 self ,
363364 requester : Requester ,
364365 sync_config : SlidingSyncConfig ,
365- from_token : Optional [StreamToken ] = None ,
366+ from_token : Optional [SlidingSyncStreamToken ] = None ,
366367 timeout_ms : int = 0 ,
367368 ) -> SlidingSyncResult :
368369 """
@@ -393,7 +394,7 @@ async def wait_for_sync_for_user(
393394 # this returns false, it means we timed out waiting, and we should
394395 # just return an empty response.
395396 before_wait_ts = self .clock .time_msec ()
396- if not await self .notifier .wait_for_stream_token (from_token ):
397+ if not await self .notifier .wait_for_stream_token (from_token . stream_token ):
397398 logger .warning (
398399 "Timed out waiting for worker to catch up. Returning empty response"
399400 )
@@ -431,7 +432,7 @@ async def current_sync_callback(
431432 sync_config .user .to_string (),
432433 timeout_ms ,
433434 current_sync_callback ,
434- from_token = from_token ,
435+ from_token = from_token . stream_token ,
435436 )
436437
437438 return result
@@ -440,7 +441,7 @@ async def current_sync_for_user(
440441 self ,
441442 sync_config : SlidingSyncConfig ,
442443 to_token : StreamToken ,
443- from_token : Optional [StreamToken ] = None ,
444+ from_token : Optional [SlidingSyncStreamToken ] = None ,
444445 ) -> SlidingSyncResult :
445446 """
446447 Generates the response body of a Sliding Sync result, represented as a
@@ -473,7 +474,7 @@ async def current_sync_for_user(
473474 await self .get_room_membership_for_user_at_to_token (
474475 user = sync_config .user ,
475476 to_token = to_token ,
476- from_token = from_token ,
477+ from_token = from_token . stream_token if from_token else None ,
477478 )
478479 )
479480
@@ -631,8 +632,11 @@ async def handle_room(room_id: str) -> None:
631632 to_token = to_token ,
632633 )
633634
635+ # TODO: Update this when we implement per-connection state
636+ connection_token = 0
637+
634638 return SlidingSyncResult (
635- next_pos = to_token ,
639+ next_pos = SlidingSyncStreamToken ( to_token , connection_token ) ,
636640 lists = lists ,
637641 rooms = rooms ,
638642 extensions = extensions ,
@@ -1367,7 +1371,7 @@ async def get_room_sync_data(
13671371 room_id : str ,
13681372 room_sync_config : RoomSyncConfig ,
13691373 room_membership_for_user_at_to_token : _RoomMembershipForUser ,
1370- from_token : Optional [StreamToken ],
1374+ from_token : Optional [SlidingSyncStreamToken ],
13711375 to_token : StreamToken ,
13721376 ) -> SlidingSyncResult .RoomResult :
13731377 """
@@ -1431,7 +1435,7 @@ async def get_room_sync_data(
14311435 # - TODO: For an incremental sync where we haven't sent it down this
14321436 # connection before
14331437 to_bound = (
1434- from_token .room_key
1438+ from_token .stream_token . room_key
14351439 if from_token is not None
14361440 and not room_membership_for_user_at_to_token .newly_joined
14371441 else None
@@ -1498,7 +1502,9 @@ async def get_room_sync_data(
14981502 instance_name = timeline_event .internal_metadata .instance_name ,
14991503 stream = timeline_event .internal_metadata .stream_ordering ,
15001504 )
1501- if persisted_position .persisted_after (from_token .room_key ):
1505+ if persisted_position .persisted_after (
1506+ from_token .stream_token .room_key
1507+ ):
15021508 num_live += 1
15031509 else :
15041510 # Since we're iterating over the timeline events in
@@ -1786,7 +1792,7 @@ async def get_extensions_response(
17861792 self ,
17871793 sync_config : SlidingSyncConfig ,
17881794 to_token : StreamToken ,
1789- from_token : Optional [StreamToken ],
1795+ from_token : Optional [SlidingSyncStreamToken ],
17901796 ) -> SlidingSyncResult .Extensions :
17911797 """Handle extension requests.
17921798
@@ -1900,7 +1906,7 @@ async def get_e2ee_extension_response(
19001906 sync_config : SlidingSyncConfig ,
19011907 e2ee_request : SlidingSyncConfig .Extensions .E2eeExtension ,
19021908 to_token : StreamToken ,
1903- from_token : Optional [StreamToken ],
1909+ from_token : Optional [SlidingSyncStreamToken ],
19041910 ) -> Optional [SlidingSyncResult .Extensions .E2eeExtension ]:
19051911 """Handle E2EE device extension (MSC3884)
19061912
@@ -1922,7 +1928,7 @@ async def get_e2ee_extension_response(
19221928 # TODO: This should take into account the `from_token` and `to_token`
19231929 device_list_updates = await self .device_handler .get_user_ids_changed (
19241930 user_id = user_id ,
1925- from_token = from_token ,
1931+ from_token = from_token . stream_token ,
19261932 )
19271933
19281934 device_one_time_keys_count : Mapping [str , int ] = {}
0 commit comments