1616import logging
1717from typing import Dict , List , Optional , Tuple
1818
19+ import attr
20+
1921from synapse .api .constants import EventContentFields
2022from synapse .api .room_versions import KNOWN_ROOM_VERSIONS
2123from synapse .events import make_event_from_dict
2830logger = logging .getLogger (__name__ )
2931
3032
33+ @attr .s (slots = True , frozen = True )
34+ class _CalculateChainCover :
35+ """Return value for _calculate_chain_cover_txn.
36+ """
37+
38+ # The last room_id/depth/stream processed.
39+ room_id = attr .ib (type = str )
40+ depth = attr .ib (type = int )
41+ stream = attr .ib (type = int )
42+
43+ # Number of rows processed
44+ processed_count = attr .ib (type = int )
45+
46+ # Map from room_id to last depth/stream processed for each room that we have
47+ # processed all events for (i.e. the rooms we can flip the
48+ # `has_auth_chain_index` for)
49+ finished_room_map = attr .ib (type = Dict [str , Tuple [int , int ]])
50+
51+
3152class EventsBackgroundUpdatesStore (SQLBaseStore ):
3253
3354 EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
@@ -719,138 +740,29 @@ async def _chain_cover_index(self, progress: dict, batch_size: int) -> int:
719740
720741 current_room_id = progress .get ("current_room_id" , "" )
721742
722- # Have we finished processing the current room.
723- finished = progress .get ("finished" , True )
724-
725743 # Where we've processed up to in the room, defaults to the start of the
726744 # room.
727745 last_depth = progress .get ("last_depth" , - 1 )
728746 last_stream = progress .get ("last_stream" , - 1 )
729747
730- # Have we set the `has_auth_chain_index` for the room yet.
731- has_set_room_has_chain_index = progress .get (
732- "has_set_room_has_chain_index" , False
748+ result = await self .db_pool .runInteraction (
749+ "_chain_cover_index" ,
750+ self ._calculate_chain_cover_txn ,
751+ current_room_id ,
752+ last_depth ,
753+ last_stream ,
754+ batch_size ,
755+ single_room = False ,
733756 )
734757
735- if finished :
736- # If we've finished with the previous room (or its our first
737- # iteration) we move on to the next room.
738-
739- def _get_next_room (txn : Cursor ) -> Optional [str ]:
740- sql = """
741- SELECT room_id FROM rooms
742- WHERE room_id > ?
743- AND (
744- NOT has_auth_chain_index
745- OR has_auth_chain_index IS NULL
746- )
747- ORDER BY room_id
748- LIMIT 1
749- """
750- txn .execute (sql , (current_room_id ,))
751- row = txn .fetchone ()
752- if row :
753- return row [0 ]
758+ finished = result .processed_count == 0
754759
755- return None
756-
757- current_room_id = await self .db_pool .runInteraction (
758- "_chain_cover_index" , _get_next_room
759- )
760- if not current_room_id :
761- await self .db_pool .updates ._end_background_update ("chain_cover" )
762- return 0
763-
764- logger .debug ("Adding chain cover to %s" , current_room_id )
765-
766- def _calculate_auth_chain (
767- txn : Cursor , last_depth : int , last_stream : int
768- ) -> Tuple [int , int , int ]:
769- # Get the next set of events in the room (that we haven't already
770- # computed chain cover for). We do this in topological order.
771-
772- # We want to do a `(topological_ordering, stream_ordering) > (?,?)`
773- # comparison, but that is not supported on older SQLite versions
774- tuple_clause , tuple_args = make_tuple_comparison_clause (
775- self .database_engine ,
776- [
777- ("topological_ordering" , last_depth ),
778- ("stream_ordering" , last_stream ),
779- ],
780- )
760+ total_rows_processed = result .processed_count
761+ current_room_id = result .room_id
762+ last_depth = result .depth
763+ last_stream = result .stream
781764
782- sql = """
783- SELECT
784- event_id, state_events.type, state_events.state_key,
785- topological_ordering, stream_ordering
786- FROM events
787- INNER JOIN state_events USING (event_id)
788- LEFT JOIN event_auth_chains USING (event_id)
789- LEFT JOIN event_auth_chain_to_calculate USING (event_id)
790- WHERE events.room_id = ?
791- AND event_auth_chains.event_id IS NULL
792- AND event_auth_chain_to_calculate.event_id IS NULL
793- AND %(tuple_cmp)s
794- ORDER BY topological_ordering, stream_ordering
795- LIMIT ?
796- """ % {
797- "tuple_cmp" : tuple_clause ,
798- }
799-
800- args = [current_room_id ]
801- args .extend (tuple_args )
802- args .append (batch_size )
803-
804- txn .execute (sql , args )
805- rows = txn .fetchall ()
806-
807- # Put the results in the necessary format for
808- # `_add_chain_cover_index`
809- event_to_room_id = {row [0 ]: current_room_id for row in rows }
810- event_to_types = {row [0 ]: (row [1 ], row [2 ]) for row in rows }
811-
812- new_last_depth = rows [- 1 ][3 ] if rows else last_depth # type: int
813- new_last_stream = rows [- 1 ][4 ] if rows else last_stream # type: int
814-
815- count = len (rows )
816-
817- # We also need to fetch the auth events for them.
818- auth_events = self .db_pool .simple_select_many_txn (
819- txn ,
820- table = "event_auth" ,
821- column = "event_id" ,
822- iterable = event_to_room_id ,
823- keyvalues = {},
824- retcols = ("event_id" , "auth_id" ),
825- )
826-
827- event_to_auth_chain = {} # type: Dict[str, List[str]]
828- for row in auth_events :
829- event_to_auth_chain .setdefault (row ["event_id" ], []).append (
830- row ["auth_id" ]
831- )
832-
833- # Calculate and persist the chain cover index for this set of events.
834- #
835- # Annoyingly we need to gut wrench into the persit event store so that
836- # we can reuse the function to calculate the chain cover for rooms.
837- PersistEventsStore ._add_chain_cover_index (
838- txn ,
839- self .db_pool ,
840- event_to_room_id ,
841- event_to_types ,
842- event_to_auth_chain ,
843- )
844-
845- return new_last_depth , new_last_stream , count
846-
847- last_depth , last_stream , count = await self .db_pool .runInteraction (
848- "_chain_cover_index" , _calculate_auth_chain , last_depth , last_stream
849- )
850-
851- total_rows_processed = count
852-
853- if count < batch_size and not has_set_room_has_chain_index :
765+ for room_id , (depth , stream ) in result .finished_room_map .items ():
854766 # If we've done all the events in the room we flip the
855767 # `has_auth_chain_index` in the DB. Note that its possible for
856768 # further events to be persisted between the above and setting the
@@ -860,42 +772,159 @@ def _calculate_auth_chain(
860772
861773 await self .db_pool .simple_update (
862774 table = "rooms" ,
863- keyvalues = {"room_id" : current_room_id },
775+ keyvalues = {"room_id" : room_id },
864776 updatevalues = {"has_auth_chain_index" : True },
865777 desc = "_chain_cover_index" ,
866778 )
867- has_set_room_has_chain_index = True
868779
869780 # Handle any events that might have raced with us flipping the
870781 # bit above.
871- last_depth , last_stream , count = await self .db_pool .runInteraction (
872- "_chain_cover_index" , _calculate_auth_chain , last_depth , last_stream
782+ result = await self .db_pool .runInteraction (
783+ "_chain_cover_index" ,
784+ self ._calculate_chain_cover_txn ,
785+ room_id ,
786+ depth ,
787+ stream ,
788+ batch_size = None ,
789+ single_room = True ,
873790 )
874791
875- total_rows_processed += count
792+ total_rows_processed += result . processed_count
876793
877- # Note that at this point its technically possible that more events
878- # than our `batch_size` have been persisted without their chain
879- # cover, so we need to continue processing this room if the last
880- # count returned was equal to the `batch_size`.
794+ if finished :
795+ await self .db_pool .updates ._end_background_update ("chain_cover" )
796+ return total_rows_processed
881797
882- if count < batch_size :
883- # We've finished calculating the index for this room, move on to the
884- # next room.
885- await self .db_pool .updates ._background_update_progress (
886- "chain_cover" , {"current_room_id" : current_room_id , "finished" : True },
887- )
888- else :
889- # We still have outstanding events to calculate the index for.
890- await self .db_pool .updates ._background_update_progress (
891- "chain_cover" ,
892- {
893- "current_room_id" : current_room_id ,
894- "last_depth" : last_depth ,
895- "last_stream" : last_stream ,
896- "has_auth_chain_index" : has_set_room_has_chain_index ,
897- "finished" : False ,
898- },
899- )
798+ await self .db_pool .updates ._background_update_progress (
799+ "chain_cover" ,
800+ {
801+ "current_room_id" : current_room_id ,
802+ "last_depth" : last_depth ,
803+ "last_stream" : last_stream ,
804+ },
805+ )
900806
901807 return total_rows_processed
808+
809+ def _calculate_chain_cover_txn (
810+ self ,
811+ txn : Cursor ,
812+ last_room_id : str ,
813+ last_depth : int ,
814+ last_stream : int ,
815+ batch_size : Optional [int ],
816+ single_room : bool ,
817+ ) -> _CalculateChainCover :
818+ """Calculate the chain cover for `batch_size` events, ordered by
819+ `(room_id, depth, stream)`.
820+
821+ Args:
822+ txn,
823+ last_room_id, last_depth, last_stream: The `(room_id, depth, stream)`
824+ tuple to fetch results after.
825+ batch_size: The maximum number of events to process. If None then
826+ no limit.
827+ single_room: Whether to calculate the index for just the given
828+ room.
829+ """
830+
831+ # Get the next set of events in the room (that we haven't already
832+ # computed chain cover for). We do this in topological order.
833+
834+ # We want to do a `(topological_ordering, stream_ordering) > (?,?)`
835+ # comparison, but that is not supported on older SQLite versions
836+ tuple_clause , tuple_args = make_tuple_comparison_clause (
837+ self .database_engine ,
838+ [
839+ ("events.room_id" , last_room_id ),
840+ ("topological_ordering" , last_depth ),
841+ ("stream_ordering" , last_stream ),
842+ ],
843+ )
844+
845+ extra_clause = ""
846+ if single_room :
847+ extra_clause = "AND events.room_id = ?"
848+ tuple_args .append (last_room_id )
849+
850+ sql = """
851+ SELECT
852+ event_id, state_events.type, state_events.state_key,
853+ topological_ordering, stream_ordering,
854+ events.room_id
855+ FROM events
856+ INNER JOIN state_events USING (event_id)
857+ LEFT JOIN event_auth_chains USING (event_id)
858+ LEFT JOIN event_auth_chain_to_calculate USING (event_id)
859+ WHERE event_auth_chains.event_id IS NULL
860+ AND event_auth_chain_to_calculate.event_id IS NULL
861+ AND %(tuple_cmp)s
862+ %(extra)s
863+ ORDER BY events.room_id, topological_ordering, stream_ordering
864+ %(limit)s
865+ """ % {
866+ "tuple_cmp" : tuple_clause ,
867+ "limit" : "LIMIT ?" if batch_size is not None else "" ,
868+ "extra" : extra_clause ,
869+ }
870+
871+ if batch_size is not None :
872+ tuple_args .append (batch_size )
873+
874+ txn .execute (sql , tuple_args )
875+ rows = txn .fetchall ()
876+
877+ # Put the results in the necessary format for
878+ # `_add_chain_cover_index`
879+ event_to_room_id = {row [0 ]: row [5 ] for row in rows }
880+ event_to_types = {row [0 ]: (row [1 ], row [2 ]) for row in rows }
881+
882+ # Calculate the new last position we've processed up to.
883+ new_last_depth = rows [- 1 ][3 ] if rows else last_depth # type: int
884+ new_last_stream = rows [- 1 ][4 ] if rows else last_stream # type: int
885+ new_last_room_id = rows [- 1 ][5 ] if rows else "" # type: str
886+
887+ # Map from room_id to last depth/stream_ordering processed for the room,
888+ # excluding the last room (which we're likely still processing). We also
889+ # need to include the room passed in if it's not included in the result
890+ # set (as we then know we've processed all events in said room).
891+ #
892+ # This is the set of rooms that we can now safely flip the
893+ # `has_auth_chain_index` bit for.
894+ finished_rooms = {
895+ row [5 ]: (row [3 ], row [4 ]) for row in rows if row [5 ] != new_last_room_id
896+ }
897+ if last_room_id not in finished_rooms and last_room_id != new_last_room_id :
898+ finished_rooms [last_room_id ] = (last_depth , last_stream )
899+
900+ count = len (rows )
901+
902+ # We also need to fetch the auth events for them.
903+ auth_events = self .db_pool .simple_select_many_txn (
904+ txn ,
905+ table = "event_auth" ,
906+ column = "event_id" ,
907+ iterable = event_to_room_id ,
908+ keyvalues = {},
909+ retcols = ("event_id" , "auth_id" ),
910+ )
911+
912+ event_to_auth_chain = {} # type: Dict[str, List[str]]
913+ for row in auth_events :
914+ event_to_auth_chain .setdefault (row ["event_id" ], []).append (row ["auth_id" ])
915+
916+ # Calculate and persist the chain cover index for this set of events.
917+ #
918+ # Annoyingly we need to gut wrench into the persit event store so that
919+ # we can reuse the function to calculate the chain cover for rooms.
920+ PersistEventsStore ._add_chain_cover_index (
921+ txn , self .db_pool , event_to_room_id , event_to_types , event_to_auth_chain ,
922+ )
923+
924+ return _CalculateChainCover (
925+ room_id = new_last_room_id ,
926+ depth = new_last_depth ,
927+ stream = new_last_stream ,
928+ processed_count = count ,
929+ finished_room_map = finished_rooms ,
930+ )
0 commit comments