Skip to content

Commit 148dac7

Browse files
committed
Add an e2e test to surface the failure scenario
1 parent e25e13a commit 148dac7

File tree

1 file changed

+127
-0
lines changed

1 file changed

+127
-0
lines changed

flow/e2e/generic_test.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,133 @@ func (s Generic) Test_Schema_Changes_Cutoff_Bug() {
568568
RequireEnvCanceled(t, env)
569569
}
570570

571+
// Test_Schema_Change_Lost_Column_Bug demonstrates a race condition where a column added
572+
// to the source table without a subsequent DML operation can be "lost" during schema evolution.
573+
// The scenario:
574+
// 1. CDC mirror is running with a small batch size
575+
// 2. ALTER TABLE adds good_column + INSERT (relation message sent for good_column)
576+
// 3. ALTER TABLE adds lost_column (NO subsequent DML, so no relation message yet)
577+
// 4. PeerDB syncs: adds good_column to destination, then applySchemaDelta updates catalog
578+
// with current source schema (which includes lost_column)
579+
// 5. Next INSERT triggers relation message showing lost_column exists
580+
// 6. PeerDB compares against catalog (which has lost_column) -> no delta detected
581+
// 7. Error: lost_column doesn't exist on destination but we try to insert data for it
582+
func (s Generic) Test_Schema_Change_Lost_Column_Bug() {
583+
t := s.T()
584+
585+
dstConnector, ok := s.DestinationConnector().(connectors.GetTableSchemaConnector)
586+
if !ok {
587+
t.Skip("skipping test because destination connector does not implement GetTableSchemaConnector")
588+
}
589+
590+
srcTable := "test_lost_column_bug"
591+
dstTable := "test_lost_column_bug_dst"
592+
srcTableName := AttachSchema(s, srcTable)
593+
dstTableName := s.DestinationTable(dstTable)
594+
595+
require.NoError(t, s.Source().Exec(t.Context(), fmt.Sprintf(`
596+
CREATE TABLE IF NOT EXISTS %s (id SERIAL PRIMARY KEY)`,
597+
srcTableName)))
598+
599+
connectionGen := FlowConnectionGenerationConfig{
600+
FlowJobName: AddSuffix(s, srcTable),
601+
TableMappings: TableMappings(s, srcTable, dstTable),
602+
Destination: s.Peer().Name,
603+
}
604+
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
605+
flowConnConfig.MaxBatchSize = 1
606+
607+
tc := NewTemporalClient(t)
608+
env := ExecutePeerflow(t, tc, flowConnConfig)
609+
SetupCDCFlowStatusQuery(t, env, flowConnConfig)
610+
611+
EnvNoError(t, env, s.Source().Exec(t.Context(), fmt.Sprintf(
612+
`ALTER TABLE %s ADD COLUMN good_column TIMESTAMP DEFAULT CURRENT_TIMESTAMP`, srcTableName)))
613+
EnvNoError(t, env, s.Source().Exec(t.Context(), fmt.Sprintf(
614+
`INSERT INTO %s DEFAULT VALUES`, srcTableName)))
615+
EnvNoError(t, env, s.Source().Exec(t.Context(), fmt.Sprintf(
616+
`ALTER TABLE %s ADD COLUMN lost_column TIMESTAMP DEFAULT CURRENT_TIMESTAMP`, srcTableName)))
617+
618+
// Wait for the first row to be synced
619+
EnvWaitForEqualTablesWithNames(env, s, "first row synced", srcTable, dstTable, "id,good_column")
620+
621+
// Verify schema after first sync
622+
expectedSchemaAfterFirstSync := &protos.TableSchema{
623+
TableIdentifier: ExpectedDestinationTableName(s, dstTable),
624+
Columns: []*protos.FieldDescription{
625+
{
626+
Name: ExpectedDestinationIdentifier(s, "id"),
627+
Type: string(types.QValueKindNumeric),
628+
TypeModifier: -1,
629+
},
630+
{
631+
Name: ExpectedDestinationIdentifier(s, "good_column"),
632+
Type: string(types.QValueKindTimestamp),
633+
TypeModifier: -1,
634+
},
635+
{
636+
Name: "_PEERDB_IS_DELETED",
637+
Type: string(types.QValueKindBoolean),
638+
TypeModifier: -1,
639+
},
640+
{
641+
Name: "_PEERDB_SYNCED_AT",
642+
Type: string(types.QValueKindTimestamp),
643+
TypeModifier: -1,
644+
},
645+
},
646+
}
647+
output, err := dstConnector.GetTableSchema(t.Context(), nil, shared.InternalVersion_Latest, protos.TypeSystem_Q,
648+
[]*protos.TableMapping{{SourceTableIdentifier: dstTableName}})
649+
EnvNoError(t, env, err)
650+
EnvTrue(t, env, CompareTableSchemas(expectedSchemaAfterFirstSync, output[dstTableName]))
651+
652+
EnvNoError(t, env, s.Source().Exec(t.Context(), fmt.Sprintf(
653+
`INSERT INTO %s DEFAULT VALUES`, srcTableName)))
654+
655+
// Wait for the first row to be synced (expect this to fail due to the bug)
656+
EnvWaitForEqualTablesWithNames(env, s, "second row synced", srcTable, dstTable, "id,good_column,lost_column")
657+
658+
// Verify schema after second sync
659+
expectedSchemaAfterSecondSync := &protos.TableSchema{
660+
TableIdentifier: ExpectedDestinationTableName(s, dstTable),
661+
Columns: []*protos.FieldDescription{
662+
{
663+
Name: ExpectedDestinationIdentifier(s, "id"),
664+
Type: string(types.QValueKindNumeric),
665+
TypeModifier: -1,
666+
},
667+
{
668+
Name: ExpectedDestinationIdentifier(s, "good_column"),
669+
Type: string(types.QValueKindTimestamp),
670+
TypeModifier: -1,
671+
},
672+
{
673+
Name: ExpectedDestinationIdentifier(s, "lost_column"),
674+
Type: string(types.QValueKindTimestamp),
675+
TypeModifier: -1,
676+
},
677+
{
678+
Name: "_PEERDB_IS_DELETED",
679+
Type: string(types.QValueKindBoolean),
680+
TypeModifier: -1,
681+
},
682+
{
683+
Name: "_PEERDB_SYNCED_AT",
684+
Type: string(types.QValueKindTimestamp),
685+
TypeModifier: -1,
686+
},
687+
},
688+
}
689+
output, err = dstConnector.GetTableSchema(t.Context(), nil, shared.InternalVersion_Latest, protos.TypeSystem_Q,
690+
[]*protos.TableMapping{{SourceTableIdentifier: dstTableName}})
691+
EnvNoError(t, env, err)
692+
EnvTrue(t, env, CompareTableSchemas(expectedSchemaAfterSecondSync, output[dstTableName]))
693+
694+
env.Cancel(t.Context())
695+
RequireEnvCanceled(t, env)
696+
}
697+
571698
func (s Generic) Test_Partitioned_Table_Without_Publish_Via_Partition_Root() {
572699
t := s.T()
573700

0 commit comments

Comments
 (0)