diff --git a/cdc/owner/scheduler.go b/cdc/owner/scheduler.go index 00f0771d815..7f47c61ff9f 100644 --- a/cdc/owner/scheduler.go +++ b/cdc/owner/scheduler.go @@ -300,6 +300,11 @@ func (s *scheduler) handleJobs(jobs []*schedulerJob) { func (s *scheduler) cleanUpFinishedOperations() { for captureID := range s.state.TaskStatuses { s.state.PatchTaskStatus(captureID, func(status *model.TaskStatus) (*model.TaskStatus, bool, error) { + if status == nil { + log.Warn("task status of the capture is not found, may be the key in etcd was deleted", zap.String("captureID", captureID), zap.String("changeFeedID", s.state.ID)) + return status, false, nil + } + changed := false for tableID, operation := range status.Operation { if operation.Status == model.OperFinished { diff --git a/cdc/owner/scheduler_test.go b/cdc/owner/scheduler_test.go index 669ab85a8f5..701e1838efb 100644 --- a/cdc/owner/scheduler_test.go +++ b/cdc/owner/scheduler_test.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/check" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/orchestrator" "github.com/pingcap/tiflow/pkg/util/testleak" ) @@ -84,8 +85,24 @@ func (s *schedulerSuite) finishTableOperation(captureID model.CaptureID, tableID func (s *schedulerSuite) TestScheduleOneCapture(c *check.C) { defer testleak.AfterTest(c)() + + s.reset(c) + captureID := "test-capture-0" + s.addCapture(captureID) + + _, _ = s.scheduler.Tick(s.state, []model.TableID{}, s.captures) + + // Manually simulate the scenario where the corresponding key was deleted in the etcd + key := &etcd.CDCKey{ + Tp: etcd.CDCKeyTypeTaskStatus, + CaptureID: captureID, + ChangefeedID: s.state.ID, + } + s.tester.MustUpdate(key.String(), nil) + s.tester.MustApplyPatches() + s.reset(c) - captureID := "test-capture-1" + captureID = "test-capture-1" s.addCapture(captureID) // add three tables