diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 0281fcf82f2..e1fb6c459b9 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -160,6 +160,12 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *model.ChangefeedReactor if err != nil { return errors.Trace(err) } + if barrierTs < checkpointTs { + // This condition implies that the DDL resolved-ts has not yet reached checkpointTs, + // which implies that it would be premature to schedule tables or to update status. + // So we return here. + return nil + } shouldUpdateState, err := c.scheduler.Tick(c.state, c.schema.AllPhysicalTables(), captures) if err != nil { return errors.Trace(err) @@ -204,7 +210,10 @@ LOOP: if c.state.Info.SyncPointEnabled { c.barriers.Update(syncPointBarrier, checkpointTs) } - c.barriers.Update(ddlJobBarrier, checkpointTs) + // Since we are starting DDL puller from (checkpointTs-1) to make + // the DDL committed at checkpointTs executable by CDC, we need to set + // the DDL barrier to the correct start point. + c.barriers.Update(ddlJobBarrier, checkpointTs-1) c.barriers.Update(finishBarrier, c.state.Info.GetTargetTs()) var err error // Note that (checkpointTs == ddl.FinishedTs) DOES NOT imply that the DDL has been completed executed. diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index 562054d1ecf..39546d90cd8 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -23,9 +23,11 @@ import ( timodel "github.com/pingcap/parser/model" "github.com/pingcap/ticdc/cdc/entry" "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/config" cdcContext "github.com/pingcap/ticdc/pkg/context" "github.com/pingcap/ticdc/pkg/orchestrator" "github.com/pingcap/ticdc/pkg/util/testleak" + "github.com/pingcap/ticdc/pkg/version" "github.com/tikv/client-go/v2/oracle" ) @@ -196,11 +198,33 @@ func (s *changefeedSuite) TestHandleError(c *check.C) { func (s *changefeedSuite) TestExecDDL(c *check.C) { defer testleak.AfterTest(c)() - ctx := cdcContext.NewBackendContext4Test(true) - cf, state, captures, tester := createChangefeed4Test(ctx, c) - defer cf.Close() + helper := entry.NewSchemaTestHelper(c) defer helper.Close() + // Creates a table, which will be deleted at the start-ts of the changefeed. + // It is expected that the changefeed DOES NOT replicate this table. + helper.DDL2Job("create database test0") + job := helper.DDL2Job("create table test0.table0(id int primary key)") + startTs := job.BinlogInfo.FinishedTS + 1000 + + ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{ + KVStorage: helper.Storage(), + CaptureInfo: &model.CaptureInfo{ + ID: "capture-id-test", + AdvertiseAddr: "127.0.0.1:0000", + Version: version.ReleaseVersion, + }, + }) + ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ + ID: "changefeed-id-test", + Info: &model.ChangeFeedInfo{ + StartTs: startTs, + Config: config.GetDefaultReplicaConfig(), + }, + }) + + cf, state, captures, tester := createChangefeed4Test(ctx, c) + defer cf.Close() tickThreeTime := func() { cf.Tick(ctx, state, captures) tester.MustApplyPatches() @@ -212,16 +236,31 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) { // pre check and initialize tickThreeTime() + c.Assert(cf.schema.AllPhysicalTables(), check.HasLen, 1) + c.Assert(state.TaskStatuses[ctx.GlobalVars().CaptureInfo.ID].Operation, check.HasLen, 0) + c.Assert(state.TaskStatuses[ctx.GlobalVars().CaptureInfo.ID].Tables, check.HasLen, 0) + + job = helper.DDL2Job("drop table test0.table0") // ddl puller resolved ts grow uo mockDDLPuller := cf.ddlPuller.(*mockDDLPuller) - mockDDLPuller.resolvedTs += 1000 + mockDDLPuller.resolvedTs = startTs mockAsyncSink := cf.sink.(*mockAsyncSink) + job.BinlogInfo.FinishedTS = mockDDLPuller.resolvedTs + mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job) // three tick to make sure all barriers set in initialize is handled tickThreeTime() c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs) + // The ephemeral table should have left no trace in the schema cache + c.Assert(cf.schema.AllPhysicalTables(), check.HasLen, 0) + + // executing the ddl finished + mockAsyncSink.ddlDone = true + mockDDLPuller.resolvedTs += 1000 + tickThreeTime() + c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs) // handle create database - job := helper.DDL2Job("create database test1") + job = helper.DDL2Job("create database test1") mockDDLPuller.resolvedTs += 1000 job.BinlogInfo.FinishedTS = mockDDLPuller.resolvedTs mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job)