From a3a6b23784b9abd04448880ea010ae85436bdc59 Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Wed, 22 Dec 2021 16:35:47 +0800 Subject: [PATCH] owner,scheduler(cdc): fix nil pointer panic in owner scheduler (#2980) (#4007) --- cdc/owner/scheduler_v1.go | 5 +++++ cdc/owner/scheduler_v1_test.go | 19 ++++++++++++++++++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/cdc/owner/scheduler_v1.go b/cdc/owner/scheduler_v1.go index 480a170d6a9..6fb504f83b3 100644 --- a/cdc/owner/scheduler_v1.go +++ b/cdc/owner/scheduler_v1.go @@ -309,6 +309,11 @@ func (s *oldScheduler) handleJobs(jobs []*schedulerJob) { func (s *oldScheduler) cleanUpFinishedOperations() { for captureID := range s.state.TaskStatuses { s.state.PatchTaskStatus(captureID, func(status *model.TaskStatus) (*model.TaskStatus, bool, error) { + if status == nil { + log.Warn("task status of the capture is not found, may be the key in etcd was deleted", zap.String("captureID", captureID), zap.String("changeFeedID", s.state.ID)) + return status, false, nil + } + changed := false for tableID, operation := range status.Operation { if operation.Status == model.OperFinished { diff --git a/cdc/owner/scheduler_v1_test.go b/cdc/owner/scheduler_v1_test.go index e732669873f..eb2950b7190 100644 --- a/cdc/owner/scheduler_v1_test.go +++ b/cdc/owner/scheduler_v1_test.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/check" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/orchestrator" "github.com/pingcap/tiflow/pkg/util/testleak" ) @@ -83,8 +84,24 @@ func (s *schedulerSuite) finishTableOperation(captureID model.CaptureID, tableID func (s *schedulerSuite) TestScheduleOneCapture(c *check.C) { defer testleak.AfterTest(c)() + + s.reset(c) + captureID := "test-capture-0" + s.addCapture(captureID) + + _, _ = s.scheduler.Tick(s.state, []model.TableID{}, s.captures) + + // Manually simulate the scenario where the corresponding key was deleted in the etcd + key := &etcd.CDCKey{ + Tp: etcd.CDCKeyTypeTaskStatus, + CaptureID: captureID, + ChangefeedID: s.state.ID, + } + s.tester.MustUpdate(key.String(), nil) + s.tester.MustApplyPatches() + s.reset(c) - captureID := "test-capture-1" + captureID = "test-capture-1" s.addCapture(captureID) // add three tables