Skip to content

Commit

Permalink
tp: add more comments and fix typo
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus committed Jun 2, 2022
1 parent b2f44a6 commit 66096f6
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 14 deletions.
29 changes: 16 additions & 13 deletions cdc/scheduler/internal/tp/capture_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@ import (

// CaptureState is the state of a capture.
//
// ┌──────────────┐ Heartbeat Resp ┌─────────────┐
// │ Uninitialize ├───────────────>│ Initialized │
// └──────┬───────┘ └──────┬──────┘
// │ │
// IsStopping │ ┌──────────┐ │ IsStopping
// └────────> │ Stopping │ <───────┘
// ┌──────────────┐ Heartbeat Resp ┌─────────────┐
// │ Uninitialized ├───────────────>│ Initialized │
// └──────┬───────┘ └──────┬──────┘
// │
// IsStopping │ ┌──────────┐ │ IsStopping
// └────────> │ Stopping │ <───────
// └──────────┘
type CaptureState int

const (
// CaptureStateUninitialize means the capture status is unknown,
// CaptureStateUninitialized means the capture status is unknown,
// no heartbeat response received yet.
CaptureStateUninitialize CaptureState = 1
CaptureStateUninitialized CaptureState = 1
// CaptureStateInitialized means owner has received heartbeat response.
CaptureStateInitialized CaptureState = 2
// CaptureStateStopping means the capture is removing, e.g., shutdown.
Expand All @@ -50,22 +50,22 @@ type CaptureStatus struct {
}

func newCaptureStatus(rev schedulepb.OwnerRevision) *CaptureStatus {
return &CaptureStatus{OwnerRev: rev, State: CaptureStateUninitialize}
return &CaptureStatus{OwnerRev: rev, State: CaptureStateUninitialized}
}

func (c *CaptureStatus) handleHeartbeatResponse(
resp *schedulepb.HeartbeatResponse, epoch schedulepb.ProcessorEpoch,
) {
// Check epoch for initialized captures.
if c.State != CaptureStateUninitialize && c.Epoch.Epoch != epoch.Epoch {
if c.State != CaptureStateUninitialized && c.Epoch.Epoch != epoch.Epoch {
log.Warn("tpscheduler: ignore heartbeat response",
zap.String("epoch", c.Epoch.Epoch),
zap.String("respEpoch", epoch.Epoch),
zap.Int64("ownerRev", c.OwnerRev.Revision))
return
}

if c.State == CaptureStateUninitialize {
if c.State == CaptureStateUninitialized {
c.Epoch = epoch
c.State = CaptureStateInitialized
}
Expand Down Expand Up @@ -112,7 +112,10 @@ func (c *captureManager) CheckAllCaptureInitialized() bool {

func (c *captureManager) checkAllCaptureInitialized() bool {
for _, captureStatus := range c.Captures {
if captureStatus.State == CaptureStateUninitialize {
// CaptureStateStopping is also considered initialized, because when
// a capture shutdown, it becomes stopping, we need to move its tables
// to other captures.
if captureStatus.State == CaptureStateUninitialized {
return false
}
}
Expand Down Expand Up @@ -190,7 +193,7 @@ func (c *captureManager) HandleAliveCaptureUpdate(
log.Info("tpscheduler: removed a capture", zap.String("capture", id))
delete(c.Captures, id)

// Only update changes after initializtion.
// Only update changes after initialization.
if !c.initialized {
continue
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/tp/capture_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestCaptureStatusHandleHeartbeatResponse(t *testing.T) {
rev := schedulepb.OwnerRevision{Revision: 1}
epoch := schedulepb.ProcessorEpoch{Epoch: "test"}
c := newCaptureStatus(rev)
require.Equal(t, CaptureStateUninitialize, c.State)
require.Equal(t, CaptureStateUninitialized, c.State)

// Uninitialize -> Initialized
c.handleHeartbeatResponse(&schedulepb.HeartbeatResponse{}, epoch)
Expand Down

0 comments on commit 66096f6

Please sign in to comment.