Skip to content

Commit

Permalink
cdc: remove verbose log and set liveness stop after resign owner
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus committed Jun 30, 2022
1 parent 4623fc0 commit 2680fc9
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 7 deletions.
7 changes: 5 additions & 2 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,8 +570,6 @@ func (c *captureImpl) AsyncClose() {

// Drain removes tables in the current TiCDC instance.
func (c *captureImpl) Drain(ctx context.Context) <-chan struct{} {
// Set liveness stopping, owners will move out all tables in the capture.
c.liveness.Store(model.LivenessCaptureStopping)
log.Info("draining capture, removing all tables in the capture")

const drainInterval = 100 * time.Millisecond
Expand All @@ -588,6 +586,9 @@ func (c *captureImpl) Drain(ctx context.Context) <-chan struct{} {
ticker.Reset(drainInterval)
select {
case <-ctx.Done():
// Give up when the context cancels. In the current
// implementation, it is caused TiCDC receives a second signal
// and begins force shutdown.
return
case <-ticker.C:
}
Expand All @@ -605,6 +606,8 @@ func (c *captureImpl) drainImpl(ctx context.Context) bool {
return false
}
// Step 2, wait for moving out all tables.
// Set liveness stopping, owners will move out all tables in the capture.
c.liveness.Store(model.LivenessCaptureStopping)
queryDone := make(chan error, 1)
tableCh := make(chan int, 1)
c.processorManager.QueryTableCount(ctx, tableCh, queryDone)
Expand Down
7 changes: 4 additions & 3 deletions cdc/capture/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ func TestDrainImmediately(t *testing.T) {
close(done)
})
done := cp.Drain(ctx)
require.Equal(t, model.LivenessCaptureStopping, cp.Liveness())
select {
case <-done:
require.Equal(t, model.LivenessCaptureStopping, cp.Liveness())
case <-time.After(time.Second):
require.Fail(t, "timeout")
}
Expand Down Expand Up @@ -136,9 +136,9 @@ func TestDrainWaitsTables(t *testing.T) {
close(done)
}).After(t1)
done := cp.Drain(ctx)
require.Equal(t, model.LivenessCaptureStopping, cp.Liveness())
select {
case <-done:
require.Equal(t, model.LivenessCaptureStopping, cp.Liveness())
require.EqualValues(t, 3, calls)
case <-time.After(3 * time.Second):
require.Fail(t, "timeout")
Expand Down Expand Up @@ -170,12 +170,12 @@ func TestDrainWaitsOwnerResign(t *testing.T) {
})

done := cp.Drain(ctx)
require.Equal(t, model.LivenessCaptureStopping, cp.Liveness())

// Must wait owner resign by wait for async close.
select {
case <-ownerStopCh:
// Simulate owner has resigned.
require.Equal(t, model.LivenessCaptureAlive, cp.Liveness())
cp.setOwner(nil)
case <-time.After(3 * time.Second):
require.Fail(t, "timeout")
Expand All @@ -187,5 +187,6 @@ func TestDrainWaitsOwnerResign(t *testing.T) {
case <-time.After(3 * time.Second):
require.Fail(t, "timeout")
case <-done:
require.Equal(t, model.LivenessCaptureStopping, cp.Liveness())
}
}
5 changes: 4 additions & 1 deletion cdc/scheduler/internal/tp/scheduler_drain_capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ func (d *drainCaptureScheduler) Schedule(
defer d.mu.Unlock()

if d.target == captureIDNotDraining {
// There are two ways to make a capture "stopping",
// 1. PUT /api/v1/capture/drain
// 2. kill <TiCDC_PID>
stopping := checkStoppingCapture(captures)
if stopping == captureIDNotDraining {
return nil
Expand Down Expand Up @@ -145,7 +148,7 @@ func (d *drainCaptureScheduler) Schedule(
// 3. the target capture cannot be found in the latest captures
if len(victims) == 0 {
log.Info("tpscheduler: drain capture scheduler finished, since no table",
zap.String("target", d.target), zap.Any("captures", captures))
zap.String("target", d.target))
d.target = captureIDNotDraining
return nil
}
Expand Down
19 changes: 19 additions & 0 deletions cdc/scheduler/internal/tp/scheduler_drain_capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,22 @@ func TestDrainStoppingCapture(t *testing.T) {
require.EqualValues(t, "a", tasks[0].moveTable.DestCapture)
require.EqualValues(t, "b", scheduler.getTarget())
}

func TestDrainSkipOwner(t *testing.T) {
t.Parallel()

var checkpointTs model.Ts
currentTables := make([]model.TableID, 0)
captures := map[model.CaptureID]*CaptureStatus{
"a": {},
"b": {IsOwner: true, State: CaptureStateStopping},
}
replications := map[model.TableID]*ReplicationSet{
1: {State: ReplicationSetStateReplicating, Primary: "a"},
2: {State: ReplicationSetStateReplicating, Primary: "b"},
}
scheduler := newDrainCaptureScheduler(10)
tasks := scheduler.Schedule(checkpointTs, currentTables, captures, replications)
require.Len(t, tasks, 0)
require.EqualValues(t, captureIDNotDraining, scheduler.getTarget())
}
1 change: 0 additions & 1 deletion pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ func (o *options) run(cmd *cobra.Command) error {
return errors.Annotate(err, "new server")
}
// Drain the server before shutdown.

shutdownNotify := func() <-chan struct{} { return server.Drain(ctx) }
util.InitSignalHandling(shutdownNotify, cancel)

Expand Down

0 comments on commit 2680fc9

Please sign in to comment.