@@ -59,7 +59,9 @@ def __init__(self, hs: "HomeServer"):
5959 self .current_max = 0
6060 self .is_processing = False
6161
62- self ._ephemeral_events_linearizer = Linearizer (name = "appservice_ephemeral_events" )
62+ self ._ephemeral_events_linearizer = Linearizer (
63+ name = "appservice_ephemeral_events"
64+ )
6365
6466 def notify_interested_services (self , max_token : RoomStreamToken ) -> None :
6567 """Notifies (pushes) all application services interested in this event.
@@ -260,7 +262,7 @@ async def _notify_interested_services_ephemeral(
260262 )
261263 ):
262264 if stream_key == "receipt_key" :
263- events = await self ._handle_receipts (service )
265+ events = await self ._handle_receipts (service , new_token )
264266 if events :
265267 self .scheduler .submit_ephemeral_events_for_as (
266268 service , events
@@ -272,7 +274,7 @@ async def _notify_interested_services_ephemeral(
272274 )
273275
274276 elif stream_key == "presence_key" :
275- events = await self ._handle_presence (service , users )
277+ events = await self ._handle_presence (service , users , new_token )
276278 if events :
277279 self .scheduler .submit_ephemeral_events_for_as (
278280 service , events
@@ -318,7 +320,9 @@ async def _handle_typing(
318320 )
319321 return typing
320322
321- async def _handle_receipts (self , service : ApplicationService ) -> List [JsonDict ]:
323+ async def _handle_receipts (
324+ self , service : ApplicationService , new_token : Optional [int ]
325+ ) -> List [JsonDict ]:
322326 """
323327 Return the latest read receipts that the given application service should receive.
324328
@@ -337,14 +341,20 @@ async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
337341 from_key = await self .store .get_type_stream_id_for_appservice (
338342 service , "read_receipt"
339343 )
344+ if new_token is not None and new_token <= from_key :
345+ raise Exception ("Rejecting token lower than stored: %s" % (new_token ,))
346+
340347 receipts_source = self .event_sources .sources .receipt
341348 receipts , _ = await receipts_source .get_new_events_as (
342349 service = service , from_key = from_key
343350 )
344351 return receipts
345352
346353 async def _handle_presence (
347- self , service : ApplicationService , users : Collection [Union [str , UserID ]]
354+ self ,
355+ service : ApplicationService ,
356+ users : Collection [Union [str , UserID ]],
357+ new_token : Optional [int ],
348358 ) -> List [JsonDict ]:
349359 """
350360 Return the latest presence updates that the given application service should receive.
@@ -367,6 +377,9 @@ async def _handle_presence(
367377 from_key = await self .store .get_type_stream_id_for_appservice (
368378 service , "presence"
369379 )
380+ if new_token is not None and new_token <= from_key :
381+ raise Exception ("Rejecting token lower than stored: %s" % (new_token ,))
382+
370383 for user in users :
371384 if isinstance (user , str ):
372385 user = UserID .from_string (user )
0 commit comments