@@ -459,6 +459,32 @@ def f(txn: LoggingTransaction) -> List[str]:
459
459
460
460
return await self .db_pool .runInteraction ("get_push_action_users_in_range" , f )
461
461
462
+ def _get_receipts_by_room_txn (
463
+ self , txn : LoggingTransaction , user_id : str
464
+ ) -> List [Tuple [str , int ]]:
465
+ receipt_types_clause , args = make_in_list_sql_clause (
466
+ self .database_engine ,
467
+ "receipt_type" ,
468
+ (
469
+ ReceiptTypes .READ ,
470
+ ReceiptTypes .READ_PRIVATE ,
471
+ ReceiptTypes .UNSTABLE_READ_PRIVATE ,
472
+ ),
473
+ )
474
+
475
+ sql = f"""
476
+ SELECT room_id, MAX(stream_ordering)
477
+ FROM receipts_linearized
478
+ INNER JOIN events USING (room_id, event_id)
479
+ WHERE { receipt_types_clause }
480
+ AND user_id = ?
481
+ GROUP BY room_id
482
+ """
483
+
484
+ args .extend ((user_id ,))
485
+ txn .execute (sql , args )
486
+ return cast (List [Tuple [str , int ]], txn .fetchall ())
487
+
462
488
async def get_unread_push_actions_for_user_in_range_for_http (
463
489
self ,
464
490
user_id : str ,
@@ -482,34 +508,11 @@ async def get_unread_push_actions_for_user_in_range_for_http(
482
508
The list will have between 0~limit entries.
483
509
"""
484
510
485
- def get_receipts_by_room (txn : LoggingTransaction ) -> List [Tuple [str , int ]]:
486
- receipt_types_clause , args = make_in_list_sql_clause (
487
- self .database_engine ,
488
- "receipt_type" ,
489
- (
490
- ReceiptTypes .READ ,
491
- ReceiptTypes .READ_PRIVATE ,
492
- ReceiptTypes .UNSTABLE_READ_PRIVATE ,
493
- ),
494
- )
495
-
496
- sql = f"""
497
- SELECT room_id, MAX(stream_ordering)
498
- FROM receipts_linearized
499
- INNER JOIN events USING (room_id, event_id)
500
- WHERE { receipt_types_clause }
501
- AND user_id = ?
502
- GROUP BY room_id
503
- """
504
-
505
- args .extend ((user_id ,))
506
- txn .execute (sql , args )
507
- return cast (List [Tuple [str , int ]], txn .fetchall ())
508
-
509
511
receipts_by_room = dict (
510
512
await self .db_pool .runInteraction (
511
513
"get_unread_push_actions_for_user_in_range_http_receipts" ,
512
- get_receipts_by_room ,
514
+ self ._get_receipts_by_room_txn ,
515
+ user_id = user_id ,
513
516
),
514
517
)
515
518
@@ -579,94 +582,34 @@ async def get_unread_push_actions_for_user_in_range_for_email(
579
582
The list will have between 0~limit entries.
580
583
"""
581
584
582
- # find rooms that have a read receipt in them and return the most recent
583
- # push actions
584
- def get_after_receipt (
585
- txn : LoggingTransaction ,
586
- ) -> List [Tuple [str , str , int , str , bool , int ]]:
587
- receipt_types_clause , args = make_in_list_sql_clause (
588
- self .database_engine ,
589
- "receipt_type" ,
590
- (
591
- ReceiptTypes .READ ,
592
- ReceiptTypes .READ_PRIVATE ,
593
- ReceiptTypes .UNSTABLE_READ_PRIVATE ,
594
- ),
595
- )
596
-
597
- sql = f"""
598
- SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
599
- ep.highlight, e.received_ts
600
- FROM (
601
- SELECT room_id,
602
- MAX(stream_ordering) as stream_ordering
603
- FROM events
604
- INNER JOIN receipts_linearized USING (room_id, event_id)
605
- WHERE { receipt_types_clause } AND user_id = ?
606
- GROUP BY room_id
607
- ) AS rl,
608
- event_push_actions AS ep
609
- INNER JOIN events AS e USING (room_id, event_id)
610
- WHERE
611
- ep.room_id = rl.room_id
612
- AND ep.stream_ordering > rl.stream_ordering
613
- AND ep.user_id = ?
614
- AND ep.stream_ordering > ?
615
- AND ep.stream_ordering <= ?
616
- AND ep.notif = 1
617
- ORDER BY ep.stream_ordering DESC LIMIT ?
618
- """
619
- args .extend (
620
- (user_id , user_id , min_stream_ordering , max_stream_ordering , limit )
621
- )
622
- txn .execute (sql , args )
623
- return cast (List [Tuple [str , str , int , str , bool , int ]], txn .fetchall ())
624
-
625
- after_read_receipt = await self .db_pool .runInteraction (
626
- "get_unread_push_actions_for_user_in_range_email_arr" , get_after_receipt
585
+ receipts_by_room = dict (
586
+ await self .db_pool .runInteraction (
587
+ "get_unread_push_actions_for_user_in_range_email_receipts" ,
588
+ self ._get_receipts_by_room_txn ,
589
+ user_id = user_id ,
590
+ ),
627
591
)
628
592
629
- # There are rooms with push actions in them but you don't have a read receipt in
630
- # them e.g. rooms you've been invited to, so get push actions for rooms which do
631
- # not have read receipts in them too.
632
- def get_no_receipt (
593
+ def get_push_actions (
633
594
txn : LoggingTransaction ,
634
595
) -> List [Tuple [str , str , int , str , bool , int ]]:
635
- receipt_types_clause , args = make_in_list_sql_clause (
636
- self .database_engine ,
637
- "receipt_type" ,
638
- (
639
- ReceiptTypes .READ ,
640
- ReceiptTypes .READ_PRIVATE ,
641
- ReceiptTypes .UNSTABLE_READ_PRIVATE ,
642
- ),
643
- )
644
-
645
- sql = f"""
596
+ sql = """
646
597
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
647
598
ep.highlight, e.received_ts
648
599
FROM event_push_actions AS ep
649
600
INNER JOIN events AS e USING (room_id, event_id)
650
601
WHERE
651
- ep.room_id NOT IN (
652
- SELECT room_id FROM receipts_linearized
653
- WHERE { receipt_types_clause } AND user_id = ?
654
- GROUP BY room_id
655
- )
656
- AND ep.user_id = ?
602
+ ep.user_id = ?
657
603
AND ep.stream_ordering > ?
658
604
AND ep.stream_ordering <= ?
659
605
AND ep.notif = 1
660
606
ORDER BY ep.stream_ordering DESC LIMIT ?
661
607
"""
662
- args .extend (
663
- (user_id , user_id , min_stream_ordering , max_stream_ordering , limit )
664
- )
665
- txn .execute (sql , args )
608
+ txn .execute (sql , (user_id , min_stream_ordering , max_stream_ordering , limit ))
666
609
return cast (List [Tuple [str , str , int , str , bool , int ]], txn .fetchall ())
667
610
668
- no_read_receipt = await self .db_pool .runInteraction (
669
- "get_unread_push_actions_for_user_in_range_email_nrr " , get_no_receipt
611
+ push_actions = await self .db_pool .runInteraction (
612
+ "get_unread_push_actions_for_user_in_range_email " , get_push_actions
670
613
)
671
614
672
615
# Make a list of dicts from the two sets of results.
@@ -678,7 +621,10 @@ def get_no_receipt(
678
621
actions = _deserialize_action (row [3 ], row [4 ]),
679
622
received_ts = row [5 ],
680
623
)
681
- for row in after_read_receipt + no_read_receipt
624
+ for row in push_actions
625
+ # Only include push actions with a stream ordering after any receipt, or without any
626
+ # receipt present (invited to but never read rooms).
627
+ if row [2 ] > receipts_by_room .get (row [1 ], 0 )
682
628
]
683
629
684
630
# Now sort it so it's ordered correctly, since currently it will
0 commit comments