@@ -619,7 +619,18 @@ class Porter(object):
619619 "create_port_table" , create_port_table
620620 )
621621
622- # Step 2. Get tables.
622+ # Step 2. Set up sequences
623+ #
624+ # We do this before porting the tables so that event if we fail half
625+ # way through the postgres DB always have sequences that are greater
626+ # than their respective tables. If we don't then creating the
627+ # `DataStore` object will fail due to the inconsistency.
628+ self .progress .set_state ("Setting up sequence generators" )
629+ await self ._setup_state_group_id_seq ()
630+ await self ._setup_user_id_seq ()
631+ await self ._setup_events_stream_seqs ()
632+
633+ # Step 3. Get tables.
623634 self .progress .set_state ("Fetching tables" )
624635 sqlite_tables = await self .sqlite_store .db_pool .simple_select_onecol (
625636 table = "sqlite_master" , keyvalues = {"type" : "table" }, retcol = "name"
@@ -634,7 +645,7 @@ class Porter(object):
634645 tables = set (sqlite_tables ) & set (postgres_tables )
635646 logger .info ("Found %d tables" , len (tables ))
636647
637- # Step 3 . Figure out what still needs copying
648+ # Step 4 . Figure out what still needs copying
638649 self .progress .set_state ("Checking on port progress" )
639650 setup_res = await make_deferred_yieldable (
640651 defer .gatherResults (
@@ -651,7 +662,7 @@ class Porter(object):
651662 # of: `postgres_size`, `table_size`, `forward_chunk`, `backward_chunk`.
652663 tables_to_port_info_map = {r [0 ]: r [1 :] for r in setup_res }
653664
654- # Step 4 . Do the copying.
665+ # Step 5 . Do the copying.
655666 #
656667 # This is slightly convoluted as we need to ensure tables are ported
657668 # in the correct order due to foreign key constraints.
@@ -685,12 +696,6 @@ class Porter(object):
685696
686697 tables_ported .update (tables_to_port )
687698
688- # Step 5. Set up sequences
689- self .progress .set_state ("Setting up sequence generators" )
690- await self ._setup_state_group_id_seq ()
691- await self ._setup_user_id_seq ()
692- await self ._setup_events_stream_seqs ()
693-
694699 self .progress .done ()
695700 except Exception as e :
696701 global end_error_exec_info
@@ -848,43 +853,62 @@ class Porter(object):
848853
849854 return done , remaining + done
850855
851- def _setup_state_group_id_seq (self ):
856+ async def _setup_state_group_id_seq (self ):
857+ curr_id = await self .sqlite_store .db_pool .simple_select_one_onecol (
858+ table = "state_groups" , keyvalues = {}, retcol = "MAX(id)" , allow_none = True
859+ )
860+
861+ if not curr_id :
862+ return
863+
852864 def r (txn ):
853- txn .execute ("SELECT MAX(id) FROM state_groups" )
854- curr_id = txn .fetchone ()[0 ]
855- if not curr_id :
856- return
857865 next_id = curr_id + 1
858866 txn .execute ("ALTER SEQUENCE state_group_id_seq RESTART WITH %s" , (next_id ,))
859867
860- return self .postgres_store .db_pool .runInteraction ("setup_state_group_id_seq" , r )
868+ await self .postgres_store .db_pool .runInteraction ("setup_state_group_id_seq" , r )
869+
870+ async def _setup_user_id_seq (self ):
871+ curr_id = await self .sqlite_store .db_pool .runInteraction (
872+ "setup_user_id_seq" , find_max_generated_user_id_localpart
873+ )
861874
862- def _setup_user_id_seq (self ):
863875 def r (txn ):
864- next_id = find_max_generated_user_id_localpart ( txn ) + 1
876+ next_id = curr_id + 1
865877 txn .execute ("ALTER SEQUENCE user_id_seq RESTART WITH %s" , (next_id ,))
866878
867879 return self .postgres_store .db_pool .runInteraction ("setup_user_id_seq" , r )
868880
869- def _setup_events_stream_seqs (self ):
870- def r (txn ):
871- txn .execute ("SELECT MAX(stream_ordering) FROM events" )
872- curr_id = txn .fetchone ()[0 ]
873- if curr_id :
874- next_id = curr_id + 1
881+ async def _setup_events_stream_seqs (self ):
882+ """Set the event stream sequences to the correct values.
883+ """
884+
885+ # We get called before we've ported the events table, so we need to
886+ # fetch the current positions from the SQLite store.
887+ curr_forward_id = await self .sqlite_store .db_pool .simple_select_one_onecol (
888+ table = "events" , keyvalues = {}, retcol = "MAX(stream_ordering)" , allow_none = True
889+ )
890+
891+ curr_backward_id = await self .sqlite_store .db_pool .simple_select_one_onecol (
892+ table = "events" ,
893+ keyvalues = {},
894+ retcol = "MAX(-MIN(stream_ordering), 1)" ,
895+ allow_none = True ,
896+ )
897+
898+ def _setup_events_stream_seqs_set_pos (txn ):
899+ if curr_forward_id :
875900 txn .execute (
876- "ALTER SEQUENCE events_stream_seq RESTART WITH %s" , (next_id ,)
901+ "ALTER SEQUENCE events_stream_seq RESTART WITH %s" ,
902+ (curr_forward_id + 1 ,),
877903 )
878904
879- txn .execute ("SELECT GREATEST(-MIN(stream_ordering), 1) FROM events" )
880- curr_id = txn .fetchone ()[0 ]
881- next_id = curr_id + 1
882905 txn .execute (
883- "ALTER SEQUENCE events_backfill_stream_seq RESTART WITH %s" , (next_id ,),
906+ "ALTER SEQUENCE events_backfill_stream_seq RESTART WITH %s" ,
907+ (curr_backward_id + 1 ,),
884908 )
885909
886- return self .postgres_store .db_pool .runInteraction (
887- "_setup_events_stream_seqs" , r
910+ return await self .postgres_store .db_pool .runInteraction (
911+ "_setup_events_stream_seqs" , _setup_events_stream_seqs_set_pos ,
888912 )
889913
890914
0 commit comments