@@ -86,6 +86,22 @@ pub struct CommittedState {
8686 /// - system tables: `st_view_sub`, `st_view_arg`
8787 /// - Tables which back views.
8888 pub ( super ) ephemeral_tables : EphemeralTables ,
89+
90+ /// Rows within `st_column` which should be ignored during replay
91+ /// due to having been superseded by a new row representing the same column.
92+ ///
93+ /// During replay, we visit all of the inserts table-by-table, followed by all of the deletes table-by-table.
94+ /// This means that, when multiple columns of a table change type within the same transaction,
95+ /// we see all of the newly-inserted `st_column` rows first, and then later, all of the deleted rows.
96+ /// We may even see inserts into the altered table before seeing the `st_column` deletes!
97+ ///
98+ /// In order to maintain a proper view of the schema of tables during replay,
99+ /// we must remember the old versions of the `st_column` rows when we insert the new ones,
100+ /// so that we can respect only the new versions.
101+ ///
102+ /// We insert into this set during [`Self::replay_insert`] of `st_column` rows
103+ /// and delete from it during [`Self::replay_delete`] of `st_column` rows.
104+ replay_columns_to_ignore : HashSet < RowPointer > ,
89105}
90106
91107impl CommittedState {
@@ -106,6 +122,7 @@ impl MemoryUsage for CommittedState {
106122 table_dropped,
107123 read_sets,
108124 ephemeral_tables,
125+ replay_columns_to_ignore,
109126 } = self ;
110127 // NOTE(centril): We do not want to include the heap usage of `page_pool` as it's a shared resource.
111128 next_tx_offset. heap_usage ( )
@@ -115,6 +132,7 @@ impl MemoryUsage for CommittedState {
115132 + table_dropped. heap_usage ( )
116133 + read_sets. heap_usage ( )
117134 + ephemeral_tables. heap_usage ( )
135+ + replay_columns_to_ignore. heap_usage ( )
118136 }
119137}
120138
@@ -180,6 +198,7 @@ impl CommittedState {
180198 read_sets : <_ >:: default ( ) ,
181199 page_pool,
182200 ephemeral_tables : <_ >:: default ( ) ,
201+ replay_columns_to_ignore : <_ >:: default ( ) ,
183202 }
184203 }
185204
@@ -377,7 +396,7 @@ impl CommittedState {
377396 let ( table, blob_store, _, page_pool) = self . get_table_and_blob_store_mut ( table_id) ?;
378397
379398 // Delete the row.
380- table
399+ let row_ptr = table
381400 . delete_equal_row ( page_pool, blob_store, row)
382401 . map_err ( TableError :: Bflatn ) ?
383402 . ok_or_else ( || anyhow ! ( "Delete for non-existent row when replaying transaction" ) ) ?;
@@ -395,6 +414,19 @@ impl CommittedState {
395414 self . table_dropped . insert ( dropped_table_id) ;
396415 }
397416
417+ if table_id == ST_COLUMN_ID {
418+ // We may have reached the corresponding delete to an insert in `st_column`
419+ // as the result of a column-type-altering migration.
420+ // Now that the outdated `st_column` row isn't present any more,
421+ // we can stop ignoring it.
422+ //
423+ // It's also possible that we're deleting this column as the result of a deleted table,
424+ // and that there wasn't any corresponding insert at all.
425+ // If that's the case, `row_ptr` won't be in `self.replay_columns_to_ignore`,
426+ // which is fine.
427+ self . replay_columns_to_ignore . remove ( & row_ptr) ;
428+ }
429+
398430 Ok ( ( ) )
399431 }
400432
@@ -418,33 +450,58 @@ impl CommittedState {
418450 // The type of a table has changed, so figure out which.
419451 // The first column in `StColumnRow` is `table_id`.
420452 let row_ptr = row_ref. pointer ( ) ;
421- self . st_column_changed ( row, row_ptr) ?;
453+ let table_id = self . ignore_previous_versions_of_column ( row, row_ptr) ?;
454+ self . st_column_changed ( table_id) ?;
422455 }
423456
424457 Ok ( ( ) )
425458 }
426459
460+ /// Mark all `st_column` rows which refer to the same column as `st_column_row`
461+ /// other than the one at `row_pointer` as outdated
462+ /// by storing them in [`Self::replay_columns_to_ignore`].
463+ ///
464+ /// Returns the ID of the table to which `st_column_row` belongs.
465+ fn ignore_previous_versions_of_column (
466+ & mut self ,
467+ st_column_row : & ProductValue ,
468+ row_ptr : RowPointer ,
469+ ) -> Result < TableId > {
470+ let target_table_id = Self :: read_table_id ( st_column_row) ;
471+ let target_col_id = ColId :: deserialize ( ValueDeserializer :: from_ref ( & st_column_row. elements [ 1 ] ) )
472+ . expect ( "second field in `st_column` should decode to a `ColId`" ) ;
473+
474+ let outdated_st_column_rows = iter_st_column_for_table ( self , & target_table_id. into ( ) ) ?
475+ . filter_map ( |row_ref| {
476+ StColumnRow :: try_from ( row_ref)
477+ . map ( |c| ( c. col_pos == target_col_id && row_ref. pointer ( ) != row_ptr) . then ( || row_ref. pointer ( ) ) )
478+ . transpose ( )
479+ } )
480+ . collect :: < Result < Vec < RowPointer > > > ( ) ?;
481+
482+ for row in outdated_st_column_rows {
483+ self . replay_columns_to_ignore . insert ( row) ;
484+ }
485+
486+ Ok ( target_table_id)
487+ }
488+
427489 /// Refreshes the columns and layout of a table
428490 /// when a `row` has been inserted from `st_column`.
429491 ///
430492 /// The `row_ptr` is a pointer to `row`.
431- fn st_column_changed ( & mut self , row : & ProductValue , row_ptr : RowPointer ) -> Result < ( ) > {
432- let target_table_id = Self :: read_table_id ( row) ;
433- let target_col_id = ColId :: deserialize ( ValueDeserializer :: from_ref ( & row. elements [ 1 ] ) )
434- . expect ( "second field in `st_column` should decode to a `ColId`" ) ;
435-
493+ fn st_column_changed ( & mut self , table_id : TableId ) -> Result < ( ) > {
436494 // We're replaying and we don't have unique constraints yet.
437495 // Due to replay handling all inserts first and deletes after,
438496 // when processing `st_column` insert/deletes,
439497 // we may end up with two definitions for the same `col_pos`.
440498 // Of those two, we're interested in the one we just inserted
441499 // and not the other one, as it is being replaced.
442- let mut columns = iter_st_column_for_table ( self , & target_table_id. into ( ) ) ?
443- . filter_map ( |row_ref| {
444- StColumnRow :: try_from ( row_ref)
445- . map ( |c| ( c. col_pos != target_col_id || row_ref. pointer ( ) == row_ptr) . then ( || c. into ( ) ) )
446- . transpose ( )
447- } )
500+ // `Self::ignore_previous_version_of_column` has marked the old version as ignored,
501+ // so filter only the non-ignored columns.
502+ let mut columns = iter_st_column_for_table ( self , & table_id. into ( ) ) ?
503+ . filter ( |row_ref| self . replay_columns_to_ignore . contains ( & row_ref. pointer ( ) ) )
504+ . map ( |row_ref| StColumnRow :: try_from ( row_ref) . map ( Into :: into) )
448505 . collect :: < Result < Vec < _ > > > ( ) ?;
449506
450507 // Columns in `st_column` are not in general sorted by their `col_pos`,
@@ -453,13 +510,23 @@ impl CommittedState {
453510 columns. sort_by_key ( |col : & ColumnSchema | col. col_pos ) ;
454511
455512 // Update the columns and layout of the the in-memory table.
456- if let Some ( table) = self . tables . get_mut ( & target_table_id ) {
513+ if let Some ( table) = self . tables . get_mut ( & table_id ) {
457514 table. change_columns_to ( columns) . map_err ( TableError :: from) ?;
458515 }
459516
460517 Ok ( ( ) )
461518 }
462519
520+ pub ( super ) fn replay_end_tx ( & mut self ) -> Result < ( ) > {
521+ self . next_tx_offset += 1 ;
522+
523+ if !self . replay_columns_to_ignore . is_empty ( ) {
524+ Err ( anyhow:: anyhow!( "`CommittedState::replay_columns_to_ignore` should be empty at the end of a commit, but found {} entries" , self . replay_columns_to_ignore. len( ) ) . into ( ) )
525+ } else {
526+ Ok ( ( ) )
527+ }
528+ }
529+
463530 /// Assuming that a `TableId` is stored as the first field in `row`, read it.
464531 fn read_table_id ( row : & ProductValue ) -> TableId {
465532 TableId :: deserialize ( ValueDeserializer :: from_ref ( & row. elements [ 0 ] ) )
0 commit comments