diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index 2bda3cf18b7..6eb9d82d224 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -73,7 +73,7 @@ type TableMeta struct { TableID model.TableID CheckpointTs model.Ts ResolvedTs model.Ts - Status TableState + State TableState } // TablePipeline is a pipeline which capture the change log from tikv in a table diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index fd0255a0cdf..9e592360cf4 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -374,14 +374,14 @@ func (p *processor) GetTableMeta(tableID model.TableID) pipeline.TableMeta { TableID: tableID, CheckpointTs: 0, ResolvedTs: 0, - Status: pipeline.TableStateAbsent, + State: pipeline.TableStateAbsent, } } return pipeline.TableMeta{ TableID: tableID, CheckpointTs: table.CheckpointTs(), ResolvedTs: table.ResolvedTs(), - Status: table.Status(), + State: table.Status(), } } diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index e6239012120..cf68b7183f5 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -260,6 +260,9 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { table1 := p.tables[1].(*mockTablePipeline) require.Equal(t, model.Ts(20), table1.sinkStartTs) require.Equal(t, pipeline.TableStateReplicating, table1.status) + meta := p.GetTableMeta(model.TableID(1)) + require.Equal(t, model.TableID(1), meta.TableID) + require.Equal(t, pipeline.TableStateReplicating, meta.State) ok, err = p.AddTable(ctx, 2, 20, false) require.Nil(t, err) @@ -380,6 +383,9 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { checkpointTs, done = p.IsRemoveTableFinished(ctx, 3) require.True(t, done) require.Equal(t, model.Ts(65), checkpointTs) + meta = p.GetTableMeta(model.TableID(3)) + require.Equal(t, model.TableID(3), meta.TableID) + require.Equal(t, pipeline.TableStateAbsent, meta.State) require.Len(t, p.tables, 3) require.True(t, table3.canceled) @@ -671,6 +677,3 @@ func TestUpdateBarrierTs(t *testing.T) { tb = p.tables[model.TableID(1)].(*mockTablePipeline) require.Equal(t, tb.barrierTs, uint64(15)) } - -func TestGetTableMeta(t *testing.T) { -} diff --git a/cdc/scheduler/internal/tp/agent.go b/cdc/scheduler/internal/tp/agent.go index 3e743e1c70e..2f8dfe9257b 100644 --- a/cdc/scheduler/internal/tp/agent.go +++ b/cdc/scheduler/internal/tp/agent.go @@ -116,7 +116,7 @@ func NewAgent(ctx context.Context, zap.String("namespace", changeFeedID.Namespace), zap.String("changefeed", changeFeedID.ID), zap.Error(err)) - return nil, nil + return result, nil } return nil, err } @@ -212,7 +212,7 @@ func (a *agent) tableStatus2PB(status pipeline.TableState) schedulepb.TableState func (a *agent) newTableStatus(tableID model.TableID) schedulepb.TableStatus { meta := a.tableExec.GetTableMeta(tableID) - state := a.tableStatus2PB(meta.Status) + state := a.tableStatus2PB(meta.State) if task, ok := a.runningTasks[tableID]; ok { // remove table task is not processed, or failed, @@ -295,8 +295,8 @@ func (a *agent) handleMessageDispatchTableRequest( zap.String("capture", a.captureID), zap.String("namespace", a.changeFeedID.Namespace), zap.String("changefeed", a.changeFeedID.ID), - zap.Any("epoch", epoch), - zap.Any("expected", a.epoch)) + zap.String("epoch", epoch.Epoch), + zap.String("expected", a.epoch.Epoch)) return } task := new(dispatchTableTask) diff --git a/cdc/scheduler/internal/tp/agent_test.go b/cdc/scheduler/internal/tp/agent_test.go index 8b84b79cd02..2a4f7ae4e80 100644 --- a/cdc/scheduler/internal/tp/agent_test.go +++ b/cdc/scheduler/internal/tp/agent_test.go @@ -583,6 +583,6 @@ func (e *MockTableExecutor) GetTableMeta(tableID model.TableID) pipeline.TableMe TableID: tableID, CheckpointTs: 0, ResolvedTs: 0, - Status: state, + State: state, } } diff --git a/cdc/scheduler/internal/tp/capture_manager.go b/cdc/scheduler/internal/tp/capture_manager.go index c8823f644b5..b64500d6722 100644 --- a/cdc/scheduler/internal/tp/capture_manager.go +++ b/cdc/scheduler/internal/tp/capture_manager.go @@ -41,6 +41,16 @@ const ( CaptureStateStopping CaptureState = 3 ) +var captureStateMap = map[CaptureState]string{ + CaptureStateUninitialized: "CaptureStateUninitialized", + CaptureStateInitialized: "CaptureStateInitialized", + CaptureStateStopping: "CaptureStateStopping", +} + +func (s CaptureState) String() string { + return captureStateMap[s] +} + // CaptureStatus represent capture's status. type CaptureStatus struct { OwnerRev schedulepb.OwnerRevision @@ -101,13 +111,7 @@ func newCaptureManager(rev schedulepb.OwnerRevision, heartbeatTick int) *capture } func (c *captureManager) CheckAllCaptureInitialized() bool { - if !c.checkAllCaptureInitialized() { - return false - } - if !c.initialized { - return false - } - return true + return c.initialized && c.checkAllCaptureInitialized() } func (c *captureManager) checkAllCaptureInitialized() bool { diff --git a/cdc/scheduler/internal/tp/coordinator.go b/cdc/scheduler/internal/tp/coordinator.go index 12fbe75c652..b242ef05b8b 100644 --- a/cdc/scheduler/internal/tp/coordinator.go +++ b/cdc/scheduler/internal/tp/coordinator.go @@ -29,16 +29,6 @@ import ( const checkpointCannotProceed = internal.CheckpointCannotProceed -type scheduler interface { - Name() string - Schedule( - checkpointTs model.Ts, - currentTables []model.TableID, - aliveCaptures map[model.CaptureID]*model.CaptureInfo, - replications map[model.TableID]*ReplicationSet, - ) []*scheduleTask -} - var _ internal.Scheduler = (*coordinator)(nil) type coordinator struct { @@ -46,7 +36,7 @@ type coordinator struct { revision schedulepb.OwnerRevision captureID model.CaptureID trans transport - scheduler []scheduler + schedulers map[schedulerType]scheduler replicationM *replicationManager captureM *captureManager } @@ -67,12 +57,18 @@ func NewCoordinator( return nil, errors.Trace(err) } revision := schedulepb.OwnerRevision{Revision: ownerRevision} + + schedulers := make(map[schedulerType]scheduler) + schedulers[schedulerTypeBurstBalance] = newBurstBalanceScheduler() + schedulers[schedulerTypeMoveTable] = newMoveTableScheduler() + schedulers[schedulerTypeRebalance] = newRebalanceScheduler() + return &coordinator{ version: version.ReleaseSemver(), revision: revision, captureID: captureID, trans: trans, - scheduler: []scheduler{newBalancer()}, + schedulers: schedulers, replicationM: newReplicationManager(cfg.MaxTaskConcurrency), captureM: newCaptureManager(revision, cfg.HeartbeatTick), }, nil @@ -90,11 +86,50 @@ func (c *coordinator) Tick( return c.poll(ctx, checkpointTs, currentTables, aliveCaptures) } -func (c *coordinator) MoveTable(tableID model.TableID, target model.CaptureID) {} +func (c *coordinator) MoveTable(tableID model.TableID, target model.CaptureID) { + if !c.captureM.CheckAllCaptureInitialized() { + log.Info("tpscheduler: manual move table task ignored, "+ + "since not all captures initialized", + zap.Int64("tableID", tableID), + zap.String("targetCapture", target)) + return + } + scheduler, ok := c.schedulers[schedulerTypeMoveTable] + if !ok { + log.Panic("tpscheduler: move table scheduler not found") + } + moveTableScheduler, ok := scheduler.(*moveTableScheduler) + if !ok { + log.Panic("tpscheduler: invalid move table scheduler found") + } + if !moveTableScheduler.addTask(tableID, target) { + log.Info("tpscheduler: manual move Table task ignored, "+ + "since the last triggered task not finished", + zap.Int64("tableID", tableID), + zap.String("targetCapture", target)) + } +} -func (c *coordinator) Rebalance() {} +func (c *coordinator) Rebalance() { + if !c.captureM.CheckAllCaptureInitialized() { + log.Info("tpscheduler: manual rebalance task ignored, " + + "since not all captures initialized") + return + } + scheduler, ok := c.schedulers[schedulerTypeRebalance] + if !ok { + log.Panic("tpscheduler: rebalance scheduler not found") + } + rebalanceScheduler, ok := scheduler.(*rebalanceScheduler) + if !ok { + log.Panic("tpscheduler: invalid rebalance scheduler found") + } + rebalanceScheduler.rebalance = true +} -func (c *coordinator) Close(ctx context.Context) {} +func (c *coordinator) Close(ctx context.Context) { + _ = c.trans.Close() +} // =========== @@ -138,12 +173,12 @@ func (c *coordinator) poll( // Generate schedule tasks based on the current status. replications := c.replicationM.ReplicationSets() allTasks := make([]*scheduleTask, 0) - for _, sched := range c.scheduler { - tasks := sched.Schedule(checkpointTs, currentTables, aliveCaptures, replications) + for _, scheduler := range c.schedulers { + tasks := scheduler.Schedule(checkpointTs, currentTables, aliveCaptures, replications) if len(tasks) != 0 { log.Info("tpscheduler: new schedule task", zap.Int("task", len(tasks)), - zap.String("scheduler", sched.Name())) + zap.String("scheduler", scheduler.Name())) } allTasks = append(allTasks, tasks...) } diff --git a/cdc/scheduler/internal/tp/coordinator_test.go b/cdc/scheduler/internal/tp/coordinator_test.go index 9f3369627a7..944066d65ce 100644 --- a/cdc/scheduler/internal/tp/coordinator_test.go +++ b/cdc/scheduler/internal/tp/coordinator_test.go @@ -196,9 +196,11 @@ func BenchmarkCoordinatorInit(b *testing.B) { for i := 0; i < total; i++ { currentTables = append(currentTables, int64(10000+i)) } + schedulers := make(map[schedulerType]scheduler) + schedulers[schedulerTypeBurstBalance] = newBurstBalanceScheduler() coord = &coordinator{ trans: &mockTrans{}, - scheduler: []scheduler{newBalancer()}, + schedulers: schedulers, replicationM: newReplicationManager(10), // Disable heartbeat. captureM: newCaptureManager(schedulepb.OwnerRevision{}, math.MaxInt), @@ -228,9 +230,11 @@ func BenchmarkCoordinatorHeartbeat(b *testing.B) { for i := 0; i < total; i++ { currentTables = append(currentTables, int64(10000+i)) } + schedulers := make(map[schedulerType]scheduler) + schedulers[schedulerTypeBurstBalance] = newBurstBalanceScheduler() coord = &coordinator{ trans: &mockTrans{}, - scheduler: []scheduler{}, + schedulers: schedulers, replicationM: newReplicationManager(10), captureM: captureM, } @@ -296,9 +300,11 @@ func BenchmarkCoordinatorHeartbeatResponse(b *testing.B) { recvBuffer: recvMsgs, keepRecvBuffer: true, } + schedulers := make(map[schedulerType]scheduler) + schedulers[schedulerTypeBurstBalance] = newBurstBalanceScheduler() coord = &coordinator{ trans: trans, - scheduler: []scheduler{}, + schedulers: schedulers, replicationM: replicationM, captureM: captureM, } diff --git a/cdc/scheduler/internal/tp/replication_manager.go b/cdc/scheduler/internal/tp/replication_manager.go index 0e571c86b07..e3176847794 100644 --- a/cdc/scheduler/internal/tp/replication_manager.go +++ b/cdc/scheduler/internal/tp/replication_manager.go @@ -117,7 +117,7 @@ func (r *replicationManager) HandleCaptureChanges( func (r *replicationManager) HandleMessage( msgs []*schedulepb.Message, ) ([]*schedulepb.Message, error) { - sentMsgs := make([]*schedulepb.Message, 0) + sentMsgs := make([]*schedulepb.Message, 0, len(msgs)) for i := range msgs { msg := msgs[i] switch msg.MsgType { @@ -230,7 +230,7 @@ func (r *replicationManager) HandleTasks( } // Check if accepting one more task exceeds maxTaskConcurrency. - if len(r.runningTasks)+1 > r.maxTaskConcurrency { + if len(r.runningTasks) == r.maxTaskConcurrency { log.Debug("tpscheduler: too many running task") // Does not use break, in case there is burst balance task // in the remaining tasks. @@ -366,6 +366,7 @@ func (r *replicationManager) handleBurstBalanceTasks( return sentMsgs, nil } +// ReplicationSets return all tracking replication set // Caller must not modify the return replication sets. func (r *replicationManager) ReplicationSets() map[model.TableID]*ReplicationSet { return r.tables diff --git a/cdc/scheduler/internal/tp/scheduler.go b/cdc/scheduler/internal/tp/scheduler.go new file mode 100644 index 00000000000..e8deaaaf6be --- /dev/null +++ b/cdc/scheduler/internal/tp/scheduler.go @@ -0,0 +1,36 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tp + +import ( + "github.com/pingcap/tiflow/cdc/model" +) + +type scheduler interface { + Name() string + Schedule( + checkpointTs model.Ts, + currentTables []model.TableID, + aliveCaptures map[model.CaptureID]*model.CaptureInfo, + replications map[model.TableID]*ReplicationSet, + ) []*scheduleTask +} + +type schedulerType string + +const ( + schedulerTypeBurstBalance schedulerType = "burst-balance-scheduler" + schedulerTypeMoveTable schedulerType = "move-table-scheduler" + schedulerTypeRebalance schedulerType = "rebalance-scheduler" +) diff --git a/cdc/scheduler/internal/tp/scheduler_balance.go b/cdc/scheduler/internal/tp/scheduler_balance.go index d25d2e86a50..b5c44751abc 100644 --- a/cdc/scheduler/internal/tp/scheduler_balance.go +++ b/cdc/scheduler/internal/tp/scheduler_balance.go @@ -19,19 +19,19 @@ import ( "go.uber.org/zap" ) -var _ scheduler = &balancer{} +var _ scheduler = &burstBalanceScheduler{} -type balancer struct{} +type burstBalanceScheduler struct{} -func newBalancer() *balancer { - return &balancer{} +func newBurstBalanceScheduler() *burstBalanceScheduler { + return &burstBalanceScheduler{} } -func (b *balancer) Name() string { - return "balancer" +func (b *burstBalanceScheduler) Name() string { + return string(schedulerTypeBurstBalance) } -func (b *balancer) Schedule( +func (b *burstBalanceScheduler) Schedule( checkpointTs model.Ts, currentTables []model.TableID, captures map[model.CaptureID]*model.CaptureInfo, @@ -117,7 +117,6 @@ func newBurstBalanceRemoveTables( } else { log.Warn("tpscheduler: primary or secondary not found for removed table", zap.Any("table", rep)) - continue } } return &scheduleTask{burstBalance: &burstBalance{ diff --git a/cdc/scheduler/internal/tp/scheduler_balance_test.go b/cdc/scheduler/internal/tp/scheduler_balance_test.go index ebbfe90006e..42255269890 100644 --- a/cdc/scheduler/internal/tp/scheduler_balance_test.go +++ b/cdc/scheduler/internal/tp/scheduler_balance_test.go @@ -29,7 +29,7 @@ func TestSchedulerBalance(t *testing.T) { // AddTable only replications := map[model.TableID]*ReplicationSet{} - b := newBalancer() + b := newBurstBalanceScheduler() tasks := b.Schedule(0, currentTables, captures, replications) require.Len(t, tasks, 1) require.Contains(t, tasks[0].burstBalance.AddTables, model.TableID(1)) @@ -96,7 +96,7 @@ func benchmarkSchedulerBalance( size := 16384 for total := 1; total <= size; total *= 2 { name, currentTables, captures, replications := factory(total) - bal := newBalancer() + bal := newBurstBalanceScheduler() b.ResetTimer() b.Run(name, func(b *testing.B) { for i := 0; i < b.N; i++ { diff --git a/cdc/scheduler/internal/tp/scheduler_move_table.go b/cdc/scheduler/internal/tp/scheduler_move_table.go new file mode 100644 index 00000000000..5d490df4ec6 --- /dev/null +++ b/cdc/scheduler/internal/tp/scheduler_move_table.go @@ -0,0 +1,129 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tp + +import ( + "sync" + + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "go.uber.org/zap" +) + +var _ scheduler = &moveTableScheduler{} + +type moveTableScheduler struct { + mu sync.Mutex + tasks map[model.TableID]*scheduleTask +} + +func newMoveTableScheduler() *moveTableScheduler { + return &moveTableScheduler{ + tasks: make(map[model.TableID]*scheduleTask), + } +} + +func (m *moveTableScheduler) Name() string { + return string(schedulerTypeMoveTable) +} + +func (m *moveTableScheduler) addTask(tableID model.TableID, target model.CaptureID) bool { + // previous triggered task not accepted yet, decline the new manual move table request. + m.mu.Lock() + defer m.mu.Unlock() + if _, ok := m.tasks[tableID]; ok { + return false + } + m.tasks[tableID] = &scheduleTask{ + moveTable: &moveTable{ + TableID: tableID, + DestCapture: target, + }, + accept: func() { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.tasks, tableID) + }, + } + return true +} + +func (m *moveTableScheduler) Schedule( + checkpointTs model.Ts, + currentTables []model.TableID, + captures map[model.CaptureID]*model.CaptureInfo, + replications map[model.TableID]*ReplicationSet, +) []*scheduleTask { + m.mu.Lock() + defer m.mu.Unlock() + + result := make([]*scheduleTask, 0) + + if len(m.tasks) == 0 { + return result + } + + if len(captures) == 0 { + return result + } + + allTables := make(map[model.TableID]struct{}) + for _, tableID := range currentTables { + allTables[tableID] = struct{}{} + } + + for tableID, task := range m.tasks { + // table may not in the all current tables + // if it was removed after manual move table triggered. + if _, ok := allTables[tableID]; !ok { + log.Warn("tpscheduler: move table ignored, since the table cannot found", + zap.Int64("tableID", tableID), + zap.String("captureID", task.moveTable.DestCapture)) + delete(m.tasks, tableID) + continue + } + // the target capture may offline after manual move table triggered. + _, ok := captures[task.moveTable.DestCapture] + if !ok { + log.Info("tpscheduler: move table ignored, since the target capture cannot found", + zap.Int64("tableID", tableID), + zap.String("captureID", task.moveTable.DestCapture)) + delete(m.tasks, tableID) + continue + } + rep, ok := replications[tableID] + if !ok { + log.Warn("tpscheduler: move table ignored, "+ + "since the table cannot found in the replication set", + zap.Int64("tableID", tableID), + zap.String("captureID", task.moveTable.DestCapture)) + delete(m.tasks, tableID) + continue + } + // only move replicating table. + if rep.State != ReplicationSetStateReplicating { + log.Info("tpscheduler: move table ignored, since the table is not replicating now", + zap.Int64("tableID", tableID), + zap.String("captureID", task.moveTable.DestCapture), + zap.Any("replicationState", rep.State)) + delete(m.tasks, tableID) + } + } + + for _, task := range m.tasks { + result = append(result, task) + } + + return result +} diff --git a/cdc/scheduler/internal/tp/scheduler_move_table_test.go b/cdc/scheduler/internal/tp/scheduler_move_table_test.go new file mode 100644 index 00000000000..7a7d8bf0d3c --- /dev/null +++ b/cdc/scheduler/internal/tp/scheduler_move_table_test.go @@ -0,0 +1,71 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tp + +import ( + "testing" + + "github.com/pingcap/tiflow/cdc/model" + "github.com/stretchr/testify/require" +) + +func TestSchedulerMoveTable(t *testing.T) { + t.Parallel() + + var checkpointTs model.Ts + captures := map[model.CaptureID]*model.CaptureInfo{"a": {}, "b": {}} + currentTables := []model.TableID{1, 2, 3, 4} + + replications := map[model.TableID]*ReplicationSet{ + 1: {State: ReplicationSetStateReplicating, Primary: "a"}, + } + + scheduler := newMoveTableScheduler() + require.Equal(t, "move-table-scheduler", scheduler.Name()) + + tasks := scheduler.Schedule(checkpointTs, currentTables, map[model.CaptureID]*model.CaptureInfo{}, replications) + require.Len(t, tasks, 0) + + scheduler.addTask(model.TableID(0), "a") + tasks = scheduler.Schedule(checkpointTs, currentTables, map[model.CaptureID]*model.CaptureInfo{}, replications) + require.Len(t, tasks, 0) + + // move a not exist table + scheduler.addTask(model.TableID(0), "a") + tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) + require.Len(t, tasks, 0) + + // move table to a not exist capture + scheduler.addTask(model.TableID(1), "c") + tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) + require.Len(t, tasks, 0) + + // move table not replicating + scheduler.addTask(model.TableID(1), "b") + tasks = scheduler.Schedule(checkpointTs, currentTables, captures, map[model.TableID]*ReplicationSet{}) + require.Len(t, tasks, 0) + + scheduler.addTask(model.TableID(1), "b") + replications[model.TableID(1)].State = ReplicationSetStatePrepare + tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) + require.Len(t, tasks, 0) + + scheduler.addTask(model.TableID(1), "b") + replications[model.TableID(1)].State = ReplicationSetStateReplicating + tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) + require.Len(t, tasks, 1) + require.Equal(t, model.TableID(1), tasks[0].moveTable.TableID) + require.Equal(t, "b", tasks[0].moveTable.DestCapture) + require.Equal(t, scheduler.tasks[model.TableID(1)], tasks[0]) +} diff --git a/cdc/scheduler/internal/tp/scheduler_rebalance.go b/cdc/scheduler/internal/tp/scheduler_rebalance.go new file mode 100644 index 00000000000..b11e8b97581 --- /dev/null +++ b/cdc/scheduler/internal/tp/scheduler_rebalance.go @@ -0,0 +1,223 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tp + +import ( + "math" + "math/rand" + "sort" + "time" + + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "go.uber.org/zap" +) + +var _ scheduler = &rebalanceScheduler{} + +type rebalanceScheduler struct { + rebalance bool + random *rand.Rand + + tasks map[model.TableID]*scheduleTask +} + +func newRebalanceScheduler() *rebalanceScheduler { + return &rebalanceScheduler{ + rebalance: false, + random: rand.New(rand.NewSource(time.Now().UnixNano())), + tasks: make(map[model.TableID]*scheduleTask), + } +} + +func (r *rebalanceScheduler) Name() string { + return string(schedulerTypeRebalance) +} + +func (r *rebalanceScheduler) Schedule( + checkpointTs model.Ts, + currentTables []model.TableID, + captures map[model.CaptureID]*model.CaptureInfo, + replications map[model.TableID]*ReplicationSet, +) []*scheduleTask { + result := make([]*scheduleTask, 0) + + // rebalance is not triggered, or there is still some pending task, + // do not generate new tasks. + if !r.rebalance || len(r.tasks) != 0 { + return result + } + + if len(captures) == 0 { + return result + } + + // only rebalance when all tables are replicating + for _, tableID := range currentTables { + rep, ok := replications[tableID] + if !ok { + return result + } + if rep.State != ReplicationSetStateReplicating { + return result + } + } + + tablesPerCapture := make(map[model.CaptureID]*tableSet) + for captureID := range captures { + tablesPerCapture[captureID] = newTableSet() + } + + for tableID, rep := range replications { + if rep.Primary == "" { + log.Panic("tpscheduler: rebalance found table no primary", + zap.Int64("tableID", tableID), + zap.Any("replication", rep)) + } + tablesPerCapture[rep.Primary].add(tableID) + } + + // findVictim return tables which need to be moved + upperLimitPerCapture := int(math.Ceil(float64(len(replications)) / float64(len(captures)))) + + victims := make([]model.TableID, 0) + for _, ts := range tablesPerCapture { + tables := ts.keys() + if r.random != nil { + // Complexity note: Shuffle has O(n), where `n` is the number of tables. + // Also, during a single call of `Schedule`, Shuffle can be called at most + // `c` times, where `c` is the number of captures (TiCDC nodes). + // Only called when a rebalance is triggered, which happens rarely, + // we do not expect a performance degradation as a result of adding + // the randomness. + r.random.Shuffle(len(tables), func(i, j int) { + tables[i], tables[j] = tables[j], tables[i] + }) + } else { + // sort the tableIDs here so that the result is deterministic, + // which would aid testing and debugging. + sort.Slice(tables, func(i, j int) bool { + return tables[i] < tables[j] + }) + } + + tableNum2Remove := len(tables) - upperLimitPerCapture + if tableNum2Remove <= 0 { + continue + } + + for _, table := range tables { + if tableNum2Remove <= 0 { + break + } + victims = append(victims, table) + ts.remove(table) + tableNum2Remove-- + } + } + + captureWorkload := make(map[model.CaptureID]int) + for captureID, ts := range tablesPerCapture { + captureWorkload[captureID] = r.randomizeWorkload(ts.size()) + } + // for each victim table, find the target for it + for _, tableID := range victims { + target := "" + minWorkload := math.MaxInt64 + + for captureID, workload := range captureWorkload { + if workload < minWorkload { + minWorkload = workload + target = captureID + } + } + + if minWorkload == math.MaxInt64 { + log.Panic("tpscheduler: rebalance meet unexpected min workload " + + "when try to the the target capture") + } + + task := &scheduleTask{ + moveTable: &moveTable{ + TableID: tableID, + DestCapture: target, + }, + accept: func() { + delete(r.tasks, tableID) + if len(r.tasks) == 0 { + r.rebalance = false + } + }, + } + r.tasks[tableID] = task + + result = append(result, task) + + tablesPerCapture[target].add(tableID) + captureWorkload[target] = r.randomizeWorkload(tablesPerCapture[target].size()) + } + + return result +} + +const ( + randomPartBitSize = 8 + randomPartMask = (1 << randomPartBitSize) - 1 +) + +// randomizeWorkload injects small randomness into the workload, so that +// when two captures tied in competing for the minimum workload, the result +// will not always be the same. +// The bitwise layout of the return value is: +// 63 8 0 +// |----- input -----|-- random val --| +func (r *rebalanceScheduler) randomizeWorkload(input int) int { + var randomPart int + if r.random != nil { + randomPart = int(r.random.Uint32() & randomPartMask) + } + // randomPart is a small random value that only affects the + // result of comparison of workloads when two workloads are equal. + return (input << randomPartBitSize) | randomPart +} + +type tableSet struct { + memo map[model.TableID]struct{} +} + +func newTableSet() *tableSet { + return &tableSet{ + memo: make(map[model.TableID]struct{}), + } +} + +func (s *tableSet) add(tableID model.TableID) { + s.memo[tableID] = struct{}{} +} + +func (s *tableSet) remove(tableID model.TableID) { + delete(s.memo, tableID) +} + +func (s *tableSet) keys() []model.TableID { + result := make([]model.TableID, 0, len(s.memo)) + for k := range s.memo { + result = append(result, k) + } + return result +} + +func (s *tableSet) size() int { + return len(s.memo) +} diff --git a/cdc/scheduler/internal/tp/scheduler_rebalance_test.go b/cdc/scheduler/internal/tp/scheduler_rebalance_test.go new file mode 100644 index 00000000000..3ae8419c0cf --- /dev/null +++ b/cdc/scheduler/internal/tp/scheduler_rebalance_test.go @@ -0,0 +1,84 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tp + +import ( + "testing" + + "github.com/pingcap/tiflow/cdc/model" + "github.com/stretchr/testify/require" +) + +func TestSchedulerRebalance(t *testing.T) { + t.Parallel() + + var checkpointTs model.Ts + captures := map[model.CaptureID]*model.CaptureInfo{"a": {}, "b": {}} + currentTables := []model.TableID{1, 2, 3, 4} + + replications := map[model.TableID]*ReplicationSet{ + 1: {State: ReplicationSetStateReplicating, Primary: "a"}, + 2: {State: ReplicationSetStateCommit, Secondary: "b"}, + 3: {State: ReplicationSetStatePrepare, Primary: "a", Secondary: "b"}, + 4: {State: ReplicationSetStateAbsent}, + } + + scheduler := newRebalanceScheduler() + require.Equal(t, "rebalance-scheduler", scheduler.Name()) + // rebalance is not triggered + tasks := scheduler.Schedule(checkpointTs, currentTables, captures, replications) + require.Len(t, tasks, 0) + + scheduler.rebalance = true + // no captures + tasks = scheduler.Schedule(checkpointTs, currentTables, map[model.CaptureID]*model.CaptureInfo{}, replications) + require.Len(t, tasks, 0) + + // table not in the replication set, + tasks = scheduler.Schedule(checkpointTs, []model.TableID{0}, captures, replications) + require.Len(t, tasks, 0) + + // not all tables are replicating, + tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) + require.Len(t, tasks, 0) + + // table distribution is balanced, should have no task. + replications = map[model.TableID]*ReplicationSet{ + 1: {State: ReplicationSetStateReplicating, Primary: "a"}, + 2: {State: ReplicationSetStateReplicating, Primary: "a"}, + 3: {State: ReplicationSetStateReplicating, Primary: "b"}, + 4: {State: ReplicationSetStateReplicating, Primary: "b"}, + } + tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) + require.Len(t, tasks, 0) + + replications[5] = &ReplicationSet{ + State: ReplicationSetStateReplicating, + Primary: "a", + } + replications[6] = &ReplicationSet{ + State: ReplicationSetStateReplicating, + Primary: "a", + } + + scheduler.random = nil // disable random to make test easier. + tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) + require.Len(t, tasks, 1) + require.Equal(t, model.TableID(1), tasks[0].moveTable.TableID) + require.Equal(t, "b", tasks[0].moveTable.DestCapture) + + // pending task is not consumed yet, this turn should have no tasks. + tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) + require.Len(t, tasks, 0) +}