9595 DatabasePool ,
9696 LoggingDatabaseConnection ,
9797 LoggingTransaction ,
98+ PostgresEngine ,
9899)
99100from synapse .storage .databases .main .receipts import ReceiptsWorkerStore
100101from synapse .storage .databases .main .stream import StreamWorkerStore
@@ -427,8 +428,8 @@ def _get_unread_counts_by_pos_txn(
427428 room_id: The room ID to get unread counts for.
428429 user_id: The user ID to get unread counts for.
429430 receipt_stream_ordering: The stream ordering of the user's latest
430- receipt in the room. If there are no receipts, the stream ordering
431- of the user's join event.
431+ unthreaded receipt in the room. If there are no unthreaded receipts,
432+ the stream ordering of the user's join event.
432433
433434 Returns:
434435 A RoomNotifCounts object containing the notification count, the
@@ -444,6 +445,20 @@ def _get_thread(thread_id: str) -> NotifCounts:
444445 return main_counts
445446 return thread_counts .setdefault (thread_id , NotifCounts ())
446447
448+ receipt_types_clause , receipts_args = make_in_list_sql_clause (
449+ self .database_engine ,
450+ "receipt_type" ,
451+ (ReceiptTypes .READ , ReceiptTypes .READ_PRIVATE ),
452+ )
453+
454+ # PostgreSQL and SQLite differ in comparing scalar numerics.
455+ if isinstance (self .database_engine , PostgresEngine ):
456+ # GREATEST ignores NULLs.
457+ receipt_stream_clause = "GREATEST(receipt_stream_ordering, ?)"
458+ else :
459+ # MAX returns NULL if any are NULL, so COALESCE to 0 first.
460+ receipt_stream_clause = "MAX(COALESCE(receipt_stream_ordering, 0), ?)"
461+
447462 # First we pull the counts from the summary table.
448463 #
449464 # We check that `last_receipt_stream_ordering` matches the stream
@@ -458,57 +473,151 @@ def _get_thread(thread_id: str) -> NotifCounts:
458473 # updated `event_push_summary` synchronously when persisting a new read
459474 # receipt).
460475 txn .execute (
461- """
462- SELECT stream_ordering, notif_count, COALESCE(unread_count, 0), thread_id
476+ f """
477+ SELECT notif_count, COALESCE(unread_count, 0), thread_id
463478 FROM event_push_summary
479+ LEFT JOIN (
480+ SELECT thread_id, MAX(stream_ordering) AS receipt_stream_ordering
481+ FROM receipts_linearized
482+ LEFT JOIN events USING (room_id, event_id)
483+ WHERE
484+ user_id = ?
485+ AND room_id = ?
486+ AND { receipt_types_clause }
487+ GROUP BY thread_id
488+ ) AS receipts USING (thread_id)
464489 WHERE room_id = ? AND user_id = ?
465490 AND (
466- (last_receipt_stream_ordering IS NULL AND stream_ordering > ? )
467- OR last_receipt_stream_ordering = ?
491+ (last_receipt_stream_ordering IS NULL AND stream_ordering > { receipt_stream_clause } )
492+ OR last_receipt_stream_ordering = { receipt_stream_clause }
468493 ) AND (notif_count != 0 OR COALESCE(unread_count, 0) != 0)
469494 """ ,
470- (room_id , user_id , receipt_stream_ordering , receipt_stream_ordering ),
495+ (
496+ user_id ,
497+ room_id ,
498+ * receipts_args ,
499+ room_id ,
500+ user_id ,
501+ receipt_stream_ordering ,
502+ receipt_stream_ordering ,
503+ ),
471504 )
472- max_summary_stream_ordering = 0
473- for summary_stream_ordering , notif_count , unread_count , thread_id in txn :
505+ summarised_threads = set ()
506+ for notif_count , unread_count , thread_id in txn :
507+ summarised_threads .add (thread_id )
474508 counts = _get_thread (thread_id )
475509 counts .notify_count += notif_count
476510 counts .unread_count += unread_count
477511
478- # Summaries will only be used if they have not been invalidated by
479- # a recent receipt; track the latest stream ordering or a valid summary.
480- #
481- # Note that since there's only one read receipt in the room per user,
482- # valid summaries are contiguous.
483- max_summary_stream_ordering = max (
484- summary_stream_ordering , max_summary_stream_ordering
485- )
486-
487512 # Next we need to count highlights, which aren't summarised
488- sql = """
513+ sql = f """
489514 SELECT COUNT(*), thread_id FROM event_push_actions
515+ LEFT JOIN (
516+ SELECT thread_id, MAX(stream_ordering) AS receipt_stream_ordering
517+ FROM receipts_linearized
518+ LEFT JOIN events USING (room_id, event_id)
519+ WHERE
520+ user_id = ?
521+ AND room_id = ?
522+ AND { receipt_types_clause }
523+ GROUP BY thread_id
524+ ) AS receipts USING (thread_id)
490525 WHERE user_id = ?
491526 AND room_id = ?
492- AND stream_ordering > ?
527+ AND stream_ordering > { receipt_stream_clause }
493528 AND highlight = 1
494529 GROUP BY thread_id
495530 """
496- txn .execute (sql , (user_id , room_id , receipt_stream_ordering ))
531+ txn .execute (
532+ sql ,
533+ (
534+ user_id ,
535+ room_id ,
536+ * receipts_args ,
537+ user_id ,
538+ room_id ,
539+ receipt_stream_ordering ,
540+ ),
541+ )
497542 for highlight_count , thread_id in txn :
498543 _get_thread (thread_id ).highlight_count += highlight_count
499544
545+ # For threads which were summarised we need to count actions since the last
546+ # rotation.
547+ thread_id_clause , thread_id_args = make_in_list_sql_clause (
548+ self .database_engine , "thread_id" , summarised_threads
549+ )
550+
551+ # The (inclusive) event stream ordering that was previously summarised.
552+ rotated_upto_stream_ordering = self .db_pool .simple_select_one_onecol_txn (
553+ txn ,
554+ table = "event_push_summary_stream_ordering" ,
555+ keyvalues = {},
556+ retcol = "stream_ordering" ,
557+ )
558+
559+ unread_counts = self ._get_notif_unread_count_for_user_room (
560+ txn , room_id , user_id , rotated_upto_stream_ordering
561+ )
562+ for notif_count , unread_count , thread_id in unread_counts :
563+ if thread_id not in summarised_threads :
564+ continue
565+
566+ if thread_id == "main" :
567+ counts .notify_count += notif_count
568+ counts .unread_count += unread_count
569+ elif thread_id in thread_counts :
570+ thread_counts [thread_id ].notify_count += notif_count
571+ thread_counts [thread_id ].unread_count += unread_count
572+ else :
573+ # Previous thread summaries of 0 are discarded above.
574+ #
575+ # TODO If empty summaries are deleted this can be removed.
576+ thread_counts [thread_id ] = NotifCounts (
577+ notify_count = notif_count ,
578+ unread_count = unread_count ,
579+ highlight_count = 0 ,
580+ )
581+
500582 # Finally we need to count push actions that aren't included in the
501583 # summary returned above. This might be due to recent events that haven't
502584 # been summarised yet or the summary is out of date due to a recent read
503585 # receipt.
504- start_unread_stream_ordering = max (
505- receipt_stream_ordering , max_summary_stream_ordering
506- )
507- unread_counts = self ._get_notif_unread_count_for_user_room (
508- txn , room_id , user_id , start_unread_stream_ordering
586+ sql = f"""
587+ SELECT
588+ COUNT(CASE WHEN notif = 1 THEN 1 END),
589+ COUNT(CASE WHEN unread = 1 THEN 1 END),
590+ thread_id
591+ FROM event_push_actions
592+ LEFT JOIN (
593+ SELECT thread_id, MAX(stream_ordering) AS receipt_stream_ordering
594+ FROM receipts_linearized
595+ LEFT JOIN events USING (room_id, event_id)
596+ WHERE
597+ user_id = ?
598+ AND room_id = ?
599+ AND { receipt_types_clause }
600+ GROUP BY thread_id
601+ ) AS receipts USING (thread_id)
602+ WHERE user_id = ?
603+ AND room_id = ?
604+ AND stream_ordering > { receipt_stream_clause }
605+ AND NOT { thread_id_clause }
606+ GROUP BY thread_id
607+ """
608+ txn .execute (
609+ sql ,
610+ (
611+ user_id ,
612+ room_id ,
613+ * receipts_args ,
614+ user_id ,
615+ room_id ,
616+ receipt_stream_ordering ,
617+ * thread_id_args ,
618+ ),
509619 )
510-
511- for notif_count , unread_count , thread_id in unread_counts :
620+ for notif_count , unread_count , thread_id in txn :
512621 counts = _get_thread (thread_id )
513622 counts .notify_count += notif_count
514623 counts .unread_count += unread_count
@@ -522,6 +631,7 @@ def _get_notif_unread_count_for_user_room(
522631 user_id : str ,
523632 stream_ordering : int ,
524633 max_stream_ordering : Optional [int ] = None ,
634+ thread_id : Optional [str ] = None ,
525635 ) -> List [Tuple [int , int , str ]]:
526636 """Returns the notify and unread counts from `event_push_actions` for
527637 the given user/room in the given range.
@@ -547,17 +657,22 @@ def _get_notif_unread_count_for_user_room(
547657 if not self ._events_stream_cache .has_entity_changed (room_id , stream_ordering ):
548658 return []
549659
550- clause = ""
660+ stream_ordering_clause = ""
551661 args = [user_id , room_id , stream_ordering ]
552662 if max_stream_ordering is not None :
553- clause = "AND ea.stream_ordering <= ?"
663+ stream_ordering_clause = "AND ea.stream_ordering <= ?"
554664 args .append (max_stream_ordering )
555665
556666 # If the max stream ordering is less than the min stream ordering,
557667 # then obviously there are zero push actions in that range.
558668 if max_stream_ordering <= stream_ordering :
559669 return []
560670
671+ thread_id_clause = ""
672+ if thread_id is not None :
673+ thread_id_clause = "AND thread_id = ?"
674+ args .append (thread_id )
675+
561676 sql = f"""
562677 SELECT
563678 COUNT(CASE WHEN notif = 1 THEN 1 END),
@@ -567,7 +682,8 @@ def _get_notif_unread_count_for_user_room(
567682 WHERE user_id = ?
568683 AND room_id = ?
569684 AND ea.stream_ordering > ?
570- { clause }
685+ { stream_ordering_clause }
686+ { thread_id_clause }
571687 GROUP BY thread_id
572688 """
573689
@@ -1073,7 +1189,7 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
10731189 )
10741190
10751191 sql = """
1076- SELECT r.stream_id, r.room_id, r.user_id, e.stream_ordering
1192+ SELECT r.stream_id, r.room_id, r.user_id, r.thread_id, e.stream_ordering
10771193 FROM receipts_linearized AS r
10781194 INNER JOIN events AS e USING (event_id)
10791195 WHERE ? < r.stream_id AND r.stream_id <= ? AND user_id LIKE ?
@@ -1096,13 +1212,18 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
10961212 )
10971213 rows = txn .fetchall ()
10981214
1099- # For each new read receipt we delete push actions from before it and
1100- # recalculate the summary .
1101- for _ , room_id , user_id , stream_ordering in rows :
1215+ # First handle all the rows without a thread ID (i.e. ones that apply to
1216+ # the entire room) .
1217+ for _ , room_id , user_id , thread_id , stream_ordering in rows :
11021218 # Only handle our own read receipts.
11031219 if not self .hs .is_mine_id (user_id ):
11041220 continue
11051221
1222+ if thread_id is not None :
1223+ continue
1224+
1225+ # For each new read receipt we delete push actions from before it and
1226+ # recalculate the summary.
11061227 txn .execute (
11071228 """
11081229 DELETE FROM event_push_actions
@@ -1144,6 +1265,64 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
11441265 value_values = [(row [0 ], row [1 ]) for row in unread_counts ],
11451266 )
11461267
1268+ # For each new read receipt we delete push actions from before it and
1269+ # recalculate the summary.
1270+ for _ , room_id , user_id , thread_id , stream_ordering in rows :
1271+ # Only handle our own read receipts.
1272+ if not self .hs .is_mine_id (user_id ):
1273+ continue
1274+
1275+ if thread_id is None :
1276+ continue
1277+
1278+ # For each new read receipt we delete push actions from before it and
1279+ # recalculate the summary.
1280+ txn .execute (
1281+ """
1282+ DELETE FROM event_push_actions
1283+ WHERE room_id = ?
1284+ AND user_id = ?
1285+ AND thread_id = ?
1286+ AND stream_ordering <= ?
1287+ AND highlight = 0
1288+ """ ,
1289+ (room_id , user_id , thread_id , stream_ordering ),
1290+ )
1291+
1292+ # Fetch the notification counts between the stream ordering of the
1293+ # latest receipt and what was previously summarised.
1294+ unread_counts = self ._get_notif_unread_count_for_user_room (
1295+ txn ,
1296+ room_id ,
1297+ user_id ,
1298+ stream_ordering ,
1299+ old_rotate_stream_ordering ,
1300+ thread_id ,
1301+ )
1302+ # unread_counts will be a list of 0 or 1 items.
1303+ if unread_counts :
1304+ notif_count , unread_count , _ = unread_counts [0 ]
1305+ else :
1306+ notif_count = 0
1307+ unread_count = 0
1308+
1309+ # Update the summary of this specific thread.
1310+ self .db_pool .simple_upsert_txn (
1311+ txn ,
1312+ table = "event_push_summary" ,
1313+ keyvalues = {
1314+ "room_id" : room_id ,
1315+ "user_id" : user_id ,
1316+ "thread_id" : thread_id ,
1317+ },
1318+ values = {
1319+ "notif_count" : notif_count ,
1320+ "unread_count" : unread_count ,
1321+ "stream_ordering" : old_rotate_stream_ordering ,
1322+ "last_receipt_stream_ordering" : stream_ordering ,
1323+ },
1324+ )
1325+
11471326 # We always update `event_push_summary_last_receipt_stream_id` to
11481327 # ensure that we don't rescan the same receipts for remote users.
11491328
0 commit comments