Skip to content

Commit

Permalink
owner: fix scheduling tables too early (#2626) (#2635)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Aug 25, 2021
1 parent 1d4209c commit 196112d
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 6 deletions.
11 changes: 10 additions & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *model.ChangefeedReactor
if err != nil {
return errors.Trace(err)
}
if barrierTs < checkpointTs {
// This condition implies that the DDL resolved-ts has not yet reached checkpointTs,
// which implies that it would be premature to schedule tables or to update status.
// So we return here.
return nil
}
shouldUpdateState, err := c.scheduler.Tick(c.state, c.schema.AllPhysicalTables(), captures)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -204,7 +210,10 @@ LOOP:
if c.state.Info.SyncPointEnabled {
c.barriers.Update(syncPointBarrier, checkpointTs)
}
c.barriers.Update(ddlJobBarrier, checkpointTs)
// Since we are starting DDL puller from (checkpointTs-1) to make
// the DDL committed at checkpointTs executable by CDC, we need to set
// the DDL barrier to the correct start point.
c.barriers.Update(ddlJobBarrier, checkpointTs-1)
c.barriers.Update(finishBarrier, c.state.Info.GetTargetTs())
var err error
// Note that (checkpointTs == ddl.FinishedTs) DOES NOT imply that the DDL has been completed executed.
Expand Down
49 changes: 44 additions & 5 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (
timodel "github.com/pingcap/parser/model"
"github.com/pingcap/ticdc/cdc/entry"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/config"
cdcContext "github.com/pingcap/ticdc/pkg/context"
"github.com/pingcap/ticdc/pkg/orchestrator"
"github.com/pingcap/ticdc/pkg/util/testleak"
"github.com/pingcap/ticdc/pkg/version"
"github.com/tikv/client-go/v2/oracle"
)

Expand Down Expand Up @@ -196,11 +198,33 @@ func (s *changefeedSuite) TestHandleError(c *check.C) {

func (s *changefeedSuite) TestExecDDL(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(true)
cf, state, captures, tester := createChangefeed4Test(ctx, c)
defer cf.Close()

helper := entry.NewSchemaTestHelper(c)
defer helper.Close()
// Creates a table, which will be deleted at the start-ts of the changefeed.
// It is expected that the changefeed DOES NOT replicate this table.
helper.DDL2Job("create database test0")
job := helper.DDL2Job("create table test0.table0(id int primary key)")
startTs := job.BinlogInfo.FinishedTS + 1000

ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{
KVStorage: helper.Storage(),
CaptureInfo: &model.CaptureInfo{
ID: "capture-id-test",
AdvertiseAddr: "127.0.0.1:0000",
Version: version.ReleaseVersion,
},
})
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: "changefeed-id-test",
Info: &model.ChangeFeedInfo{
StartTs: startTs,
Config: config.GetDefaultReplicaConfig(),
},
})

cf, state, captures, tester := createChangefeed4Test(ctx, c)
defer cf.Close()
tickThreeTime := func() {
cf.Tick(ctx, state, captures)
tester.MustApplyPatches()
Expand All @@ -212,16 +236,31 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) {
// pre check and initialize
tickThreeTime()

c.Assert(cf.schema.AllPhysicalTables(), check.HasLen, 1)
c.Assert(state.TaskStatuses[ctx.GlobalVars().CaptureInfo.ID].Operation, check.HasLen, 0)
c.Assert(state.TaskStatuses[ctx.GlobalVars().CaptureInfo.ID].Tables, check.HasLen, 0)

job = helper.DDL2Job("drop table test0.table0")
// ddl puller resolved ts grow uo
mockDDLPuller := cf.ddlPuller.(*mockDDLPuller)
mockDDLPuller.resolvedTs += 1000
mockDDLPuller.resolvedTs = startTs
mockAsyncSink := cf.sink.(*mockAsyncSink)
job.BinlogInfo.FinishedTS = mockDDLPuller.resolvedTs
mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job)
// three tick to make sure all barriers set in initialize is handled
tickThreeTime()
c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs)
// The ephemeral table should have left no trace in the schema cache
c.Assert(cf.schema.AllPhysicalTables(), check.HasLen, 0)

// executing the ddl finished
mockAsyncSink.ddlDone = true
mockDDLPuller.resolvedTs += 1000
tickThreeTime()
c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs)

// handle create database
job := helper.DDL2Job("create database test1")
job = helper.DDL2Job("create database test1")
mockDDLPuller.resolvedTs += 1000
job.BinlogInfo.FinishedTS = mockDDLPuller.resolvedTs
mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job)
Expand Down

0 comments on commit 196112d

Please sign in to comment.