@@ -540,7 +540,9 @@ def _get_users_sent_receipts_between_txn(txn: LoggingTransaction) -> List[str]:
540540
541541 async def get_all_updated_receipts (
542542 self , instance_name : str , last_id : int , current_id : int , limit : int
543- ) -> Tuple [List [Tuple [int , list ]], int , bool ]:
543+ ) -> Tuple [
544+ List [Tuple [int , Tuple [str , str , str , str , Optional [str ], JsonDict ]]], int , bool
545+ ]:
544546 """Get updates for receipts replication stream.
545547
546548 Args:
@@ -567,9 +569,13 @@ async def get_all_updated_receipts(
567569
568570 def get_all_updated_receipts_txn (
569571 txn : LoggingTransaction ,
570- ) -> Tuple [List [Tuple [int , list ]], int , bool ]:
572+ ) -> Tuple [
573+ List [Tuple [int , Tuple [str , str , str , str , Optional [str ], JsonDict ]]],
574+ int ,
575+ bool ,
576+ ]:
571577 sql = """
572- SELECT stream_id, room_id, receipt_type, user_id, event_id, data
578+ SELECT stream_id, room_id, receipt_type, user_id, event_id, thread_id, data
573579 FROM receipts_linearized
574580 WHERE ? < stream_id AND stream_id <= ?
575581 ORDER BY stream_id ASC
@@ -578,8 +584,8 @@ def get_all_updated_receipts_txn(
578584 txn .execute (sql , (last_id , current_id , limit ))
579585
580586 updates = cast (
581- List [Tuple [int , list ]],
582- [(r [0 ], r [1 :5 ] + (db_to_json (r [5 ]),)) for r in txn ],
587+ List [Tuple [int , Tuple [ str , str , str , str , Optional [ str ], JsonDict ] ]],
588+ [(r [0 ], r [1 :6 ] + (db_to_json (r [6 ]),)) for r in txn ],
583589 )
584590
585591 limited = False
@@ -631,6 +637,7 @@ def _insert_linearized_receipt_txn(
631637 receipt_type : str ,
632638 user_id : str ,
633639 event_id : str ,
640+ thread_id : Optional [str ],
634641 data : JsonDict ,
635642 stream_id : int ,
636643 ) -> Optional [int ]:
@@ -657,12 +664,27 @@ def _insert_linearized_receipt_txn(
657664 # We don't want to clobber receipts for more recent events, so we
658665 # have to compare orderings of existing receipts
659666 if stream_ordering is not None :
660- sql = (
661- "SELECT stream_ordering, event_id FROM events"
662- " INNER JOIN receipts_linearized AS r USING (event_id, room_id)"
663- " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
667+ if thread_id is None :
668+ thread_clause = "r.thread_id IS NULL"
669+ thread_args : Tuple [str , ...] = ()
670+ else :
671+ thread_clause = "r.thread_id = ?"
672+ thread_args = (thread_id ,)
673+
674+ sql = f"""
675+ SELECT stream_ordering, event_id FROM events
676+ INNER JOIN receipts_linearized AS r USING (event_id, room_id)
677+ WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ? AND { thread_clause }
678+ """
679+ txn .execute (
680+ sql ,
681+ (
682+ room_id ,
683+ receipt_type ,
684+ user_id ,
685+ )
686+ + thread_args ,
664687 )
665- txn .execute (sql , (room_id , receipt_type , user_id ))
666688
667689 for so , eid in txn :
668690 if int (so ) >= stream_ordering :
@@ -682,21 +704,28 @@ def _insert_linearized_receipt_txn(
682704 self ._receipts_stream_cache .entity_has_changed , room_id , stream_id
683705 )
684706
707+ keyvalues = {
708+ "room_id" : room_id ,
709+ "receipt_type" : receipt_type ,
710+ "user_id" : user_id ,
711+ }
712+ where_clause = ""
713+ if thread_id is None :
714+ where_clause = "thread_id IS NULL"
715+ else :
716+ keyvalues ["thread_id" ] = thread_id
717+
685718 self .db_pool .simple_upsert_txn (
686719 txn ,
687720 table = "receipts_linearized" ,
688- keyvalues = {
689- "room_id" : room_id ,
690- "receipt_type" : receipt_type ,
691- "user_id" : user_id ,
692- },
721+ keyvalues = keyvalues ,
693722 values = {
694723 "stream_id" : stream_id ,
695724 "event_id" : event_id ,
696725 "event_stream_ordering" : stream_ordering ,
697726 "data" : json_encoder .encode (data ),
698- "thread_id" : None ,
699727 },
728+ where_clause = where_clause ,
700729 # receipts_linearized has a unique constraint on
701730 # (user_id, room_id, receipt_type), so no need to lock
702731 lock = False ,
@@ -748,6 +777,7 @@ async def insert_receipt(
748777 receipt_type : str ,
749778 user_id : str ,
750779 event_ids : List [str ],
780+ thread_id : Optional [str ],
751781 data : dict ,
752782 ) -> Optional [Tuple [int , int ]]:
753783 """Insert a receipt, either from local client or remote server.
@@ -780,6 +810,7 @@ async def insert_receipt(
780810 receipt_type ,
781811 user_id ,
782812 linearized_event_id ,
813+ thread_id ,
783814 data ,
784815 stream_id = stream_id ,
785816 # Read committed is actually beneficial here because we check for a receipt with
@@ -794,7 +825,8 @@ async def insert_receipt(
794825
795826 now = self ._clock .time_msec ()
796827 logger .debug (
797- "RR for event %s in %s (%i ms old)" ,
828+ "Receipt %s for event %s in %s (%i ms old)" ,
829+ receipt_type ,
798830 linearized_event_id ,
799831 room_id ,
800832 now - event_ts ,
@@ -807,6 +839,7 @@ async def insert_receipt(
807839 receipt_type ,
808840 user_id ,
809841 event_ids ,
842+ thread_id ,
810843 data ,
811844 )
812845
@@ -821,6 +854,7 @@ def _insert_graph_receipt_txn(
821854 receipt_type : str ,
822855 user_id : str ,
823856 event_ids : List [str ],
857+ thread_id : Optional [str ],
824858 data : JsonDict ,
825859 ) -> None :
826860 assert self ._can_write_to_receipts
@@ -832,19 +866,26 @@ def _insert_graph_receipt_txn(
832866 # FIXME: This shouldn't invalidate the whole cache
833867 txn .call_after (self ._get_linearized_receipts_for_room .invalidate , (room_id ,))
834868
869+ keyvalues = {
870+ "room_id" : room_id ,
871+ "receipt_type" : receipt_type ,
872+ "user_id" : user_id ,
873+ }
874+ where_clause = ""
875+ if thread_id is None :
876+ where_clause = "thread_id IS NULL"
877+ else :
878+ keyvalues ["thread_id" ] = thread_id
879+
835880 self .db_pool .simple_upsert_txn (
836881 txn ,
837882 table = "receipts_graph" ,
838- keyvalues = {
839- "room_id" : room_id ,
840- "receipt_type" : receipt_type ,
841- "user_id" : user_id ,
842- },
883+ keyvalues = keyvalues ,
843884 values = {
844885 "event_ids" : json_encoder .encode (event_ids ),
845886 "data" : json_encoder .encode (data ),
846- "thread_id" : None ,
847887 },
888+ where_clause = where_clause ,
848889 # receipts_graph has a unique constraint on
849890 # (user_id, room_id, receipt_type), so no need to lock
850891 lock = False ,
0 commit comments