From 11a6096dc07d29ca362b4c190d90257f6863996a Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Thu, 30 Jun 2022 13:32:36 +0800 Subject: [PATCH] cdc: skip owner resign when there is only one capture Signed-off-by: Neil Shen --- cdc/capture/capture.go | 30 +++++++++- cdc/capture/capture_test.go | 114 +++++++++++++++++++++++++++++++++++- cdc/owner/owner.go | 2 + 3 files changed, 140 insertions(+), 6 deletions(-) diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 3a973d106f6..9898350a413 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -70,8 +70,8 @@ type captureImpl struct { captureMu sync.Mutex info *model.CaptureInfo processorManager processor.Manager - - liveness model.Liveness + liveness model.Liveness + config *config.ServerConfig pdEndpoints []string UpstreamManager *upstream.Manager @@ -111,6 +111,7 @@ type captureImpl struct { // NewCapture returns a new Capture instance func NewCapture(pdEnpoints []string, etcdClient *etcd.CDCEtcdClient, grpcService *p2p.ServerWrapper) Capture { return &captureImpl{ + config: config.GetGlobalServerConfig(), liveness: model.LivenessCaptureAlive, EtcdClient: etcdClient, grpcService: grpcService, @@ -598,9 +599,31 @@ func (c *captureImpl) Drain(ctx context.Context) <-chan struct{} { } func (c *captureImpl) drainImpl(ctx context.Context) bool { + if !c.config.Debug.EnableTwoPhaseScheduler { + // Skip drain as two phase scheduler is disabled. + return true + } + // Step 1, resign ownership. o, _ := c.GetOwner() if o != nil { + doneCh := make(chan error, 1) + query := &owner.Query{Tp: owner.QueryCaptures, Data: []*model.CaptureInfo{}} + o.Query(query, doneCh) + select { + case <-ctx.Done(): + case err := <-doneCh: + if err != nil { + log.Warn("query capture count failed, retry", zap.Error(err)) + return false + } + } + if len(query.Data.([]*model.CaptureInfo)) <= 1 { + // There is only one capture, the owner itself. It's impossible to + // resign owner nor move out tables, give up drain. + log.Warn("there is only one capture, skip drain") + return true + } o.AsyncStop() // Make sure it's not the owner before step 2. return false @@ -615,7 +638,8 @@ func (c *captureImpl) drainImpl(ctx context.Context) bool { case <-ctx.Done(): case err := <-queryDone: if err != nil { - log.Warn("query table count failed", zap.Error(err)) + log.Warn("query table count failed, retry", zap.Error(err)) + return false } } select { diff --git a/cdc/capture/capture_test.go b/cdc/capture/capture_test.go index 368f7096a07..39bcb847038 100644 --- a/cdc/capture/capture_test.go +++ b/cdc/capture/capture_test.go @@ -15,14 +15,17 @@ package capture import ( "context" + "fmt" "sync" "testing" "time" "github.com/golang/mock/gomock" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/owner" mock_owner "github.com/pingcap/tiflow/cdc/owner/mock" mock_processor "github.com/pingcap/tiflow/cdc/processor/mock" + "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/etcd" "github.com/stretchr/testify/require" "go.etcd.io/etcd/client/pkg/v3/logutil" @@ -84,7 +87,8 @@ func TestDrainImmediately(t *testing.T) { ctx := context.Background() ctrl := gomock.NewController(t) mm := mock_processor.NewMockManager(ctrl) - cp := &captureImpl{processorManager: mm} + cp := &captureImpl{processorManager: mm, config: config.GetDefaultServerConfig()} + cp.config.Debug.EnableTwoPhaseScheduler = true require.Equal(t, model.LivenessCaptureAlive, cp.Liveness()) // Drain completes immediately. @@ -109,7 +113,8 @@ func TestDrainWaitsTables(t *testing.T) { ctx := context.Background() ctrl := gomock.NewController(t) mm := mock_processor.NewMockManager(ctrl) - cp := &captureImpl{processorManager: mm} + cp := &captureImpl{processorManager: mm, config: config.GetDefaultServerConfig()} + cp.config.Debug.EnableTwoPhaseScheduler = true require.Equal(t, model.LivenessCaptureAlive, cp.Liveness()) // Drain waits for moving out all tables. @@ -152,10 +157,18 @@ func TestDrainWaitsOwnerResign(t *testing.T) { ctrl := gomock.NewController(t) mo := mock_owner.NewMockOwner(ctrl) mm := mock_processor.NewMockManager(ctrl) - cp := &captureImpl{processorManager: mm, owner: mo} + cp := &captureImpl{processorManager: mm, owner: mo, config: config.GetDefaultServerConfig()} + cp.config.Debug.EnableTwoPhaseScheduler = true require.Equal(t, model.LivenessCaptureAlive, cp.Liveness()) ownerStopCh := make(chan struct{}, 1) + mo.EXPECT().Query(gomock.Any(), gomock.Any()).Do(func( + query *owner.Query, done chan<- error, + ) { + // Two captures to allow owner resign. + query.Data = []*model.CaptureInfo{{}, {}} + close(done) + }).AnyTimes() mo.EXPECT().AsyncStop().Do(func() { select { case ownerStopCh <- struct{}{}: @@ -190,3 +203,98 @@ func TestDrainWaitsOwnerResign(t *testing.T) { require.Equal(t, model.LivenessCaptureStopping, cp.Liveness()) } } + +func TestDrainOneCapture(t *testing.T) { + t.Parallel() + + ctx := context.Background() + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + mm := mock_processor.NewMockManager(ctrl) + cp := &captureImpl{processorManager: mm, owner: mo, config: config.GetDefaultServerConfig()} + cp.config.Debug.EnableTwoPhaseScheduler = true + require.Equal(t, model.LivenessCaptureAlive, cp.Liveness()) + + mo.EXPECT().Query(gomock.Any(), gomock.Any()).Do(func( + query *owner.Query, done chan<- error, + ) { + // Only one capture, skip drain. + query.Data = []*model.CaptureInfo{{}} + close(done) + }).AnyTimes() + + done := cp.Drain(ctx) + + select { + case <-time.After(3 * time.Second): + require.Fail(t, "timeout") + case <-done: + } +} + +func TestDrainErrors(t *testing.T) { + t.Parallel() + + ctx := context.Background() + ctrl := gomock.NewController(t) + mo := mock_owner.NewMockOwner(ctrl) + mm := mock_processor.NewMockManager(ctrl) + cp := &captureImpl{processorManager: mm, owner: mo, config: config.GetDefaultServerConfig()} + cp.config.Debug.EnableTwoPhaseScheduler = true + require.Equal(t, model.LivenessCaptureAlive, cp.Liveness()) + + errQueryCall := mo.EXPECT().Query(gomock.Any(), gomock.Any()).Do(func( + query *owner.Query, done chan<- error, + ) { + done <- fmt.Errorf("test") + close(done) + }) + ownerStopCh := make(chan struct{}, 1) + okQueryCall := mo.EXPECT().Query(gomock.Any(), gomock.Any()).Do(func( + query *owner.Query, done chan<- error, + ) { + // Two captures to allow owner resign. + query.Data = []*model.CaptureInfo{{}, {}} + close(done) + }).AnyTimes().After(errQueryCall) + mo.EXPECT().AsyncStop().Do(func() { + select { + case ownerStopCh <- struct{}{}: + default: + } + }).AnyTimes().After(okQueryCall) + + errTableCall := mm.EXPECT(). + QueryTableCount(gomock.Any(), gomock.Any(), gomock.Any()). + Do(func(ctx context.Context, tableCh chan int, done chan<- error) { + done <- fmt.Errorf("test") + close(done) + }).After(okQueryCall) + mm.EXPECT(). + QueryTableCount(gomock.Any(), gomock.Any(), gomock.Any()). + Do(func(ctx context.Context, tableCh chan int, done chan<- error) { + tableCh <- 0 + close(done) + }).After(errTableCall) + + done := cp.Drain(ctx) + + // Must wait owner resign by wait for async close. + select { + case <-ownerStopCh: + // Simulate owner has resigned. + require.Equal(t, model.LivenessCaptureAlive, cp.Liveness()) + cp.setOwner(nil) + case <-time.After(3 * time.Second): + require.Fail(t, "timeout") + case <-done: + require.Fail(t, "unexpected") + } + + select { + case <-time.After(3 * time.Second): + require.Fail(t, "timeout") + case <-done: + require.Equal(t, model.LivenessCaptureStopping, cp.Liveness()) + } +} diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index c024e6e4dae..d2d365592ce 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -302,6 +302,8 @@ func (o *ownerImpl) AsyncStop() { // Must be called after setting closed. o.cleanupOwnerJob() o.cleanStaleMetrics() + + // FIXME: cleanup ownerJobQueue. } func (o *ownerImpl) cleanUpChangefeed(state *orchestrator.ChangefeedReactorState) {