diff --git a/cdc/scheduler/internal/tp/capture_manager.go b/cdc/scheduler/internal/tp/capture_manager.go index 4bb4d5a250e..c8823f644b5 100644 --- a/cdc/scheduler/internal/tp/capture_manager.go +++ b/cdc/scheduler/internal/tp/capture_manager.go @@ -22,19 +22,19 @@ import ( // CaptureState is the state of a capture. // -// ┌──────────────┐ Heartbeat Resp ┌─────────────┐ -// │ Uninitialize ├───────────────>│ Initialized │ -// └──────┬───────┘ └──────┬──────┘ -// │ │ -// IsStopping │ ┌──────────┐ │ IsStopping -// └────────> │ Stopping │ <───────┘ +// ┌───────────────┐ Heartbeat Resp ┌─────────────┐ +// │ Uninitialized ├───────────────>│ Initialized │ +// └──────┬────────┘ └──────┬──────┘ +// │ │ +// IsStopping │ ┌──────────┐ │ IsStopping +// └────────> │ Stopping │ <────────┘ // └──────────┘ type CaptureState int const ( - // CaptureStateUninitialize means the capture status is unknown, + // CaptureStateUninitialized means the capture status is unknown, // no heartbeat response received yet. - CaptureStateUninitialize CaptureState = 1 + CaptureStateUninitialized CaptureState = 1 // CaptureStateInitialized means owner has received heartbeat response. CaptureStateInitialized CaptureState = 2 // CaptureStateStopping means the capture is removing, e.g., shutdown. @@ -50,14 +50,14 @@ type CaptureStatus struct { } func newCaptureStatus(rev schedulepb.OwnerRevision) *CaptureStatus { - return &CaptureStatus{OwnerRev: rev, State: CaptureStateUninitialize} + return &CaptureStatus{OwnerRev: rev, State: CaptureStateUninitialized} } func (c *CaptureStatus) handleHeartbeatResponse( resp *schedulepb.HeartbeatResponse, epoch schedulepb.ProcessorEpoch, ) { // Check epoch for initialized captures. - if c.State != CaptureStateUninitialize && c.Epoch.Epoch != epoch.Epoch { + if c.State != CaptureStateUninitialized && c.Epoch.Epoch != epoch.Epoch { log.Warn("tpscheduler: ignore heartbeat response", zap.String("epoch", c.Epoch.Epoch), zap.String("respEpoch", epoch.Epoch), @@ -65,7 +65,7 @@ func (c *CaptureStatus) handleHeartbeatResponse( return } - if c.State == CaptureStateUninitialize { + if c.State == CaptureStateUninitialized { c.Epoch = epoch c.State = CaptureStateInitialized } @@ -112,7 +112,10 @@ func (c *captureManager) CheckAllCaptureInitialized() bool { func (c *captureManager) checkAllCaptureInitialized() bool { for _, captureStatus := range c.Captures { - if captureStatus.State == CaptureStateUninitialize { + // CaptureStateStopping is also considered initialized, because when + // a capture shutdown, it becomes stopping, we need to move its tables + // to other captures. + if captureStatus.State == CaptureStateUninitialized { return false } } @@ -190,7 +193,7 @@ func (c *captureManager) HandleAliveCaptureUpdate( log.Info("tpscheduler: removed a capture", zap.String("capture", id)) delete(c.Captures, id) - // Only update changes after initializtion. + // Only update changes after initialization. if !c.initialized { continue } diff --git a/cdc/scheduler/internal/tp/capture_manager_test.go b/cdc/scheduler/internal/tp/capture_manager_test.go index bc5f65cbd51..dd64d627953 100644 --- a/cdc/scheduler/internal/tp/capture_manager_test.go +++ b/cdc/scheduler/internal/tp/capture_manager_test.go @@ -27,7 +27,7 @@ func TestCaptureStatusHandleHeartbeatResponse(t *testing.T) { rev := schedulepb.OwnerRevision{Revision: 1} epoch := schedulepb.ProcessorEpoch{Epoch: "test"} c := newCaptureStatus(rev) - require.Equal(t, CaptureStateUninitialize, c.State) + require.Equal(t, CaptureStateUninitialized, c.State) // Uninitialize -> Initialized c.handleHeartbeatResponse(&schedulepb.HeartbeatResponse{}, epoch)