@@ -637,6 +637,7 @@ def _insert_linearized_receipt_txn(
637637 receipt_type : str ,
638638 user_id : str ,
639639 event_id : str ,
640+ thread_id : Optional [str ],
640641 data : JsonDict ,
641642 stream_id : int ,
642643 ) -> Optional [int ]:
@@ -663,12 +664,27 @@ def _insert_linearized_receipt_txn(
663664 # We don't want to clobber receipts for more recent events, so we
664665 # have to compare orderings of existing receipts
665666 if stream_ordering is not None :
666- sql = (
667- "SELECT stream_ordering, event_id FROM events"
668- " INNER JOIN receipts_linearized AS r USING (event_id, room_id)"
669- " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
667+ if thread_id is None :
668+ thread_clause = "r.thread_id IS NULL"
669+ thread_args : Tuple [str , ...] = ()
670+ else :
671+ thread_clause = "r.thread_id = ?"
672+ thread_args = (thread_id ,)
673+
674+ sql = f"""
675+ SELECT stream_ordering, event_id FROM events
676+ INNER JOIN receipts_linearized AS r USING (event_id, room_id)
677+ WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ? AND { thread_clause }
678+ """
679+ txn .execute (
680+ sql ,
681+ (
682+ room_id ,
683+ receipt_type ,
684+ user_id ,
685+ )
686+ + thread_args ,
670687 )
671- txn .execute (sql , (room_id , receipt_type , user_id ))
672688
673689 for so , eid in txn :
674690 if int (so ) >= stream_ordering :
@@ -688,21 +704,28 @@ def _insert_linearized_receipt_txn(
688704 self ._receipts_stream_cache .entity_has_changed , room_id , stream_id
689705 )
690706
707+ keyvalues = {
708+ "room_id" : room_id ,
709+ "receipt_type" : receipt_type ,
710+ "user_id" : user_id ,
711+ }
712+ where_clause = ""
713+ if thread_id is None :
714+ where_clause = "thread_id IS NULL"
715+ else :
716+ keyvalues ["thread_id" ] = thread_id
717+
691718 self .db_pool .simple_upsert_txn (
692719 txn ,
693720 table = "receipts_linearized" ,
694- keyvalues = {
695- "room_id" : room_id ,
696- "receipt_type" : receipt_type ,
697- "user_id" : user_id ,
698- },
721+ keyvalues = keyvalues ,
699722 values = {
700723 "stream_id" : stream_id ,
701724 "event_id" : event_id ,
702725 "event_stream_ordering" : stream_ordering ,
703726 "data" : json_encoder .encode (data ),
704- "thread_id" : None ,
705727 },
728+ where_clause = where_clause ,
706729 # receipts_linearized has a unique constraint on
707730 # (user_id, room_id, receipt_type), so no need to lock
708731 lock = False ,
@@ -754,6 +777,7 @@ async def insert_receipt(
754777 receipt_type : str ,
755778 user_id : str ,
756779 event_ids : List [str ],
780+ thread_id : Optional [str ],
757781 data : dict ,
758782 ) -> Optional [Tuple [int , int ]]:
759783 """Insert a receipt, either from local client or remote server.
@@ -786,6 +810,7 @@ async def insert_receipt(
786810 receipt_type ,
787811 user_id ,
788812 linearized_event_id ,
813+ thread_id ,
789814 data ,
790815 stream_id = stream_id ,
791816 # Read committed is actually beneficial here because we check for a receipt with
@@ -800,7 +825,8 @@ async def insert_receipt(
800825
801826 now = self ._clock .time_msec ()
802827 logger .debug (
803- "RR for event %s in %s (%i ms old)" ,
828+ "Receipt %s for event %s in %s (%i ms old)" ,
829+ receipt_type ,
804830 linearized_event_id ,
805831 room_id ,
806832 now - event_ts ,
@@ -813,6 +839,7 @@ async def insert_receipt(
813839 receipt_type ,
814840 user_id ,
815841 event_ids ,
842+ thread_id ,
816843 data ,
817844 )
818845
@@ -827,6 +854,7 @@ def _insert_graph_receipt_txn(
827854 receipt_type : str ,
828855 user_id : str ,
829856 event_ids : List [str ],
857+ thread_id : Optional [str ],
830858 data : JsonDict ,
831859 ) -> None :
832860 assert self ._can_write_to_receipts
@@ -838,19 +866,26 @@ def _insert_graph_receipt_txn(
838866 # FIXME: This shouldn't invalidate the whole cache
839867 txn .call_after (self ._get_linearized_receipts_for_room .invalidate , (room_id ,))
840868
869+ keyvalues = {
870+ "room_id" : room_id ,
871+ "receipt_type" : receipt_type ,
872+ "user_id" : user_id ,
873+ }
874+ where_clause = ""
875+ if thread_id is None :
876+ where_clause = "thread_id IS NULL"
877+ else :
878+ keyvalues ["thread_id" ] = thread_id
879+
841880 self .db_pool .simple_upsert_txn (
842881 txn ,
843882 table = "receipts_graph" ,
844- keyvalues = {
845- "room_id" : room_id ,
846- "receipt_type" : receipt_type ,
847- "user_id" : user_id ,
848- },
883+ keyvalues = keyvalues ,
849884 values = {
850885 "event_ids" : json_encoder .encode (event_ids ),
851886 "data" : json_encoder .encode (data ),
852- "thread_id" : None ,
853887 },
888+ where_clause = where_clause ,
854889 # receipts_graph has a unique constraint on
855890 # (user_id, room_id, receipt_type), so no need to lock
856891 lock = False ,
0 commit comments