34
34
)
35
35
from synapse .storage .databases .main .directory import RoomAliasMapping
36
36
from synapse .types import JsonDict , RoomAlias , RoomStreamToken , UserID
37
+ from synapse .util .async_helpers import Linearizer
37
38
from synapse .util .metrics import Measure
38
39
39
40
if TYPE_CHECKING :
@@ -58,6 +59,10 @@ def __init__(self, hs: "HomeServer"):
58
59
self .current_max = 0
59
60
self .is_processing = False
60
61
62
+ self ._ephemeral_events_linearizer = Linearizer (
63
+ name = "appservice_ephemeral_events"
64
+ )
65
+
61
66
def notify_interested_services (self , max_token : RoomStreamToken ) -> None :
62
67
"""Notifies (pushes) all application services interested in this event.
63
68
@@ -260,26 +265,37 @@ async def _notify_interested_services_ephemeral(
260
265
events = await self ._handle_typing (service , new_token )
261
266
if events :
262
267
self .scheduler .submit_ephemeral_events_for_as (service , events )
268
+ continue
263
269
264
- elif stream_key == "receipt_key" :
265
- events = await self ._handle_receipts (service )
266
- if events :
267
- self .scheduler .submit_ephemeral_events_for_as (service , events )
268
-
269
- # Persist the latest handled stream token for this appservice
270
- await self .store .set_type_stream_id_for_appservice (
271
- service , "read_receipt" , new_token
270
+ # Since we read/update the stream position for this AS/stream
271
+ with (
272
+ await self ._ephemeral_events_linearizer .queue (
273
+ (service .id , stream_key )
272
274
)
275
+ ):
276
+ if stream_key == "receipt_key" :
277
+ events = await self ._handle_receipts (service , new_token )
278
+ if events :
279
+ self .scheduler .submit_ephemeral_events_for_as (
280
+ service , events
281
+ )
282
+
283
+ # Persist the latest handled stream token for this appservice
284
+ await self .store .set_type_stream_id_for_appservice (
285
+ service , "read_receipt" , new_token
286
+ )
273
287
274
- elif stream_key == "presence_key" :
275
- events = await self ._handle_presence (service , users )
276
- if events :
277
- self .scheduler .submit_ephemeral_events_for_as (service , events )
288
+ elif stream_key == "presence_key" :
289
+ events = await self ._handle_presence (service , users , new_token )
290
+ if events :
291
+ self .scheduler .submit_ephemeral_events_for_as (
292
+ service , events
293
+ )
278
294
279
- # Persist the latest handled stream token for this appservice
280
- await self .store .set_type_stream_id_for_appservice (
281
- service , "presence" , new_token
282
- )
295
+ # Persist the latest handled stream token for this appservice
296
+ await self .store .set_type_stream_id_for_appservice (
297
+ service , "presence" , new_token
298
+ )
283
299
284
300
async def _handle_typing (
285
301
self , service : ApplicationService , new_token : int
@@ -316,7 +332,9 @@ async def _handle_typing(
316
332
)
317
333
return typing
318
334
319
- async def _handle_receipts (self , service : ApplicationService ) -> List [JsonDict ]:
335
+ async def _handle_receipts (
336
+ self , service : ApplicationService , new_token : Optional [int ]
337
+ ) -> List [JsonDict ]:
320
338
"""
321
339
Return the latest read receipts that the given application service should receive.
322
340
@@ -335,14 +353,23 @@ async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
335
353
from_key = await self .store .get_type_stream_id_for_appservice (
336
354
service , "read_receipt"
337
355
)
356
+ if new_token is not None and new_token <= from_key :
357
+ logger .debug (
358
+ "Rejecting token lower than or equal to stored: %s" % (new_token ,)
359
+ )
360
+ return []
361
+
338
362
receipts_source = self .event_sources .sources .receipt
339
363
receipts , _ = await receipts_source .get_new_events_as (
340
364
service = service , from_key = from_key
341
365
)
342
366
return receipts
343
367
344
368
async def _handle_presence (
345
- self , service : ApplicationService , users : Collection [Union [str , UserID ]]
369
+ self ,
370
+ service : ApplicationService ,
371
+ users : Collection [Union [str , UserID ]],
372
+ new_token : Optional [int ],
346
373
) -> List [JsonDict ]:
347
374
"""
348
375
Return the latest presence updates that the given application service should receive.
@@ -365,6 +392,12 @@ async def _handle_presence(
365
392
from_key = await self .store .get_type_stream_id_for_appservice (
366
393
service , "presence"
367
394
)
395
+ if new_token is not None and new_token <= from_key :
396
+ logger .debug (
397
+ "Rejecting token lower than or equal to stored: %s" % (new_token ,)
398
+ )
399
+ return []
400
+
368
401
for user in users :
369
402
if isinstance (user , str ):
370
403
user = UserID .from_string (user )
0 commit comments