Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions flow/e2e/generic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,133 @@ func (s Generic) Test_Schema_Changes_Cutoff_Bug() {
RequireEnvCanceled(t, env)
}

// Test_Schema_Change_Lost_Column_Bug demonstrates a race condition where a column added
// to the source table without a subsequent DML operation can be "lost" during schema evolution.
// The scenario:
// 1. CDC mirror is running with a small batch size
// 2. ALTER TABLE adds good_column + INSERT (relation message sent for good_column)
// 3. ALTER TABLE adds lost_column (NO subsequent DML, so no relation message yet)
// 4. PeerDB syncs: adds good_column to destination, then applySchemaDelta updates catalog
// with current source schema (which includes lost_column)
// 5. Next INSERT triggers relation message showing lost_column exists
// 6. PeerDB compares against catalog (which has lost_column) -> no delta detected
// 7. Error: lost_column doesn't exist on destination but we try to insert data for it
func (s Generic) Test_Schema_Change_Lost_Column_Bug() {
t := s.T()

dstConnector, ok := s.DestinationConnector().(connectors.GetTableSchemaConnector)
if !ok {
t.Skip("skipping test because destination connector does not implement GetTableSchemaConnector")
}

srcTable := "test_lost_column_bug"
dstTable := "test_lost_column_bug_dst"
srcTableName := AttachSchema(s, srcTable)
dstTableName := s.DestinationTable(dstTable)

require.NoError(t, s.Source().Exec(t.Context(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (id SERIAL PRIMARY KEY)`,
srcTableName)))

connectionGen := FlowConnectionGenerationConfig{
FlowJobName: AddSuffix(s, srcTable),
TableMappings: TableMappings(s, srcTable, dstTable),
Destination: s.Peer().Name,
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.MaxBatchSize = 1

tc := NewTemporalClient(t)
env := ExecutePeerflow(t, tc, flowConnConfig)
SetupCDCFlowStatusQuery(t, env, flowConnConfig)

EnvNoError(t, env, s.Source().Exec(t.Context(), fmt.Sprintf(
`ALTER TABLE %s ADD COLUMN good_column TIMESTAMP DEFAULT CURRENT_TIMESTAMP`, srcTableName)))
EnvNoError(t, env, s.Source().Exec(t.Context(), fmt.Sprintf(
`INSERT INTO %s DEFAULT VALUES`, srcTableName)))
EnvNoError(t, env, s.Source().Exec(t.Context(), fmt.Sprintf(
`ALTER TABLE %s ADD COLUMN lost_column TIMESTAMP DEFAULT CURRENT_TIMESTAMP`, srcTableName)))

// Wait for the first row to be synced
EnvWaitForEqualTablesWithNames(env, s, "first row synced", srcTable, dstTable, "id,good_column")

// Verify schema after first sync
expectedSchemaAfterFirstSync := &protos.TableSchema{
TableIdentifier: ExpectedDestinationTableName(s, dstTable),
Columns: []*protos.FieldDescription{
{
Name: ExpectedDestinationIdentifier(s, "id"),
Type: string(types.QValueKindNumeric),
TypeModifier: -1,
},
{
Name: ExpectedDestinationIdentifier(s, "good_column"),
Type: string(types.QValueKindTimestamp),
TypeModifier: -1,
},
{
Name: "_PEERDB_IS_DELETED",
Type: string(types.QValueKindBoolean),
TypeModifier: -1,
},
{
Name: "_PEERDB_SYNCED_AT",
Type: string(types.QValueKindTimestamp),
TypeModifier: -1,
},
},
}
output, err := dstConnector.GetTableSchema(t.Context(), nil, shared.InternalVersion_Latest, protos.TypeSystem_Q,
[]*protos.TableMapping{{SourceTableIdentifier: dstTableName}})
EnvNoError(t, env, err)
EnvTrue(t, env, CompareTableSchemas(expectedSchemaAfterFirstSync, output[dstTableName]))

EnvNoError(t, env, s.Source().Exec(t.Context(), fmt.Sprintf(
`INSERT INTO %s DEFAULT VALUES`, srcTableName)))

// Wait for the first row to be synced (expect this to fail due to the bug)
EnvWaitForEqualTablesWithNames(env, s, "second row synced", srcTable, dstTable, "id,good_column,lost_column")

// Verify schema after second sync
expectedSchemaAfterSecondSync := &protos.TableSchema{
TableIdentifier: ExpectedDestinationTableName(s, dstTable),
Columns: []*protos.FieldDescription{
{
Name: ExpectedDestinationIdentifier(s, "id"),
Type: string(types.QValueKindNumeric),
TypeModifier: -1,
},
{
Name: ExpectedDestinationIdentifier(s, "good_column"),
Type: string(types.QValueKindTimestamp),
TypeModifier: -1,
},
{
Name: ExpectedDestinationIdentifier(s, "lost_column"),
Type: string(types.QValueKindTimestamp),
TypeModifier: -1,
},
{
Name: "_PEERDB_IS_DELETED",
Type: string(types.QValueKindBoolean),
TypeModifier: -1,
},
{
Name: "_PEERDB_SYNCED_AT",
Type: string(types.QValueKindTimestamp),
TypeModifier: -1,
},
},
}
output, err = dstConnector.GetTableSchema(t.Context(), nil, shared.InternalVersion_Latest, protos.TypeSystem_Q,
[]*protos.TableMapping{{SourceTableIdentifier: dstTableName}})
EnvNoError(t, env, err)
EnvTrue(t, env, CompareTableSchemas(expectedSchemaAfterSecondSync, output[dstTableName]))

env.Cancel(t.Context())
RequireEnvCanceled(t, env)
}

func (s Generic) Test_Partitioned_Table_Without_Publish_Via_Partition_Root() {
t := s.T()

Expand Down
Loading