@@ -597,7 +597,7 @@ def process_replication_rows(
597597
598598 return super ().process_replication_rows (stream_name , instance_name , token , rows )
599599
600- def insert_linearized_receipt_txn (
600+ def _insert_linearized_receipt_txn (
601601 self ,
602602 txn : LoggingTransaction ,
603603 room_id : str ,
@@ -686,6 +686,44 @@ def insert_linearized_receipt_txn(
686686
687687 return rx_ts
688688
689+ def _graph_to_linear (
690+ self , txn : LoggingTransaction , room_id : str , event_ids : List [str ]
691+ ) -> str :
692+ """
693+ Generate a linearized event from a list of events (i.e. a list of forward
694+ extremities in the room).
695+
696+ This should allow for calculation of the correct read receipt even if
697+ servers have different event ordering.
698+
699+ Args:
700+ txn: The transaction
701+ room_id: The room ID the events are in.
702+ event_ids: The list of event IDs to linearize.
703+
704+ Returns:
705+ The linearized event ID.
706+ """
707+ # TODO: Make this better.
708+ clause , args = make_in_list_sql_clause (
709+ self .database_engine , "event_id" , event_ids
710+ )
711+
712+ sql = """
713+ SELECT event_id WHERE room_id = ? AND stream_ordering IN (
714+ SELECT max(stream_ordering) WHERE %s
715+ )
716+ """ % (
717+ clause ,
718+ )
719+
720+ txn .execute (sql , [room_id ] + list (args ))
721+ rows = txn .fetchall ()
722+ if rows :
723+ return rows [0 ][0 ]
724+ else :
725+ raise RuntimeError ("Unrecognized event_ids: %r" % (event_ids ,))
726+
689727 async def insert_receipt (
690728 self ,
691729 room_id : str ,
@@ -712,35 +750,14 @@ async def insert_receipt(
712750 linearized_event_id = event_ids [0 ]
713751 else :
714752 # we need to points in graph -> linearized form.
715- # TODO: Make this better.
716- def graph_to_linear (txn : LoggingTransaction ) -> str :
717- clause , args = make_in_list_sql_clause (
718- self .database_engine , "event_id" , event_ids
719- )
720-
721- sql = """
722- SELECT event_id WHERE room_id = ? AND stream_ordering IN (
723- SELECT max(stream_ordering) WHERE %s
724- )
725- """ % (
726- clause ,
727- )
728-
729- txn .execute (sql , [room_id ] + list (args ))
730- rows = txn .fetchall ()
731- if rows :
732- return rows [0 ][0 ]
733- else :
734- raise RuntimeError ("Unrecognized event_ids: %r" % (event_ids ,))
735-
736753 linearized_event_id = await self .db_pool .runInteraction (
737- "insert_receipt_conv" , graph_to_linear
754+ "insert_receipt_conv" , self . _graph_to_linear , room_id , event_ids
738755 )
739756
740757 async with self ._receipts_id_gen .get_next () as stream_id : # type: ignore[attr-defined]
741758 event_ts = await self .db_pool .runInteraction (
742759 "insert_linearized_receipt" ,
743- self .insert_linearized_receipt_txn ,
760+ self ._insert_linearized_receipt_txn ,
744761 room_id ,
745762 receipt_type ,
746763 user_id ,
@@ -761,33 +778,21 @@ def graph_to_linear(txn: LoggingTransaction) -> str:
761778 now - event_ts ,
762779 )
763780
764- await self .insert_graph_receipt (room_id , receipt_type , user_id , event_ids , data )
765-
766- max_persisted_id = self ._receipts_id_gen .get_current_token ()
767-
768- return stream_id , max_persisted_id
769-
770- async def insert_graph_receipt (
771- self ,
772- room_id : str ,
773- receipt_type : str ,
774- user_id : str ,
775- event_ids : List [str ],
776- data : JsonDict ,
777- ) -> None :
778- assert self ._can_write_to_receipts
779-
780781 await self .db_pool .runInteraction (
781782 "insert_graph_receipt" ,
782- self .insert_graph_receipt_txn ,
783+ self ._insert_graph_receipt_txn ,
783784 room_id ,
784785 receipt_type ,
785786 user_id ,
786787 event_ids ,
787788 data ,
788789 )
789790
790- def insert_graph_receipt_txn (
791+ max_persisted_id = self ._receipts_id_gen .get_current_token ()
792+
793+ return stream_id , max_persisted_id
794+
795+ def _insert_graph_receipt_txn (
791796 self ,
792797 txn : LoggingTransaction ,
793798 room_id : str ,
0 commit comments