diff --git a/cdc/scheduler/internal/tp/capture_manager.go b/cdc/scheduler/internal/tp/capture_manager.go index a3c0fa191cd..75a2c0ad8e1 100644 --- a/cdc/scheduler/internal/tp/capture_manager.go +++ b/cdc/scheduler/internal/tp/capture_manager.go @@ -215,6 +215,9 @@ func (c *captureManager) HandleAliveCaptureUpdate( c.changes.Removed = make(map[string][]schedulepb.TableStatus) } c.changes.Removed[id] = capture.Tables + + cf := c.changefeedID + captureTableGauge.DeleteLabelValues(cf.Namespace, cf.ID, capture.Addr) } } diff --git a/cdc/scheduler/internal/tp/coordinator.go b/cdc/scheduler/internal/tp/coordinator.go index 911f3fc3465..1e9bd90a693 100644 --- a/cdc/scheduler/internal/tp/coordinator.go +++ b/cdc/scheduler/internal/tp/coordinator.go @@ -83,7 +83,8 @@ func newCoordinator( ) *coordinator { revision := schedulepb.OwnerRevision{Revision: ownerRevision} schedulers := make(map[schedulerType]scheduler) - schedulers[schedulerTypeBurstBalance] = newBurstBalanceScheduler() + schedulers[schedulerTypeBasic] = newBasicScheduler() + schedulers[schedulerTypeBalance] = newBalanceScheduler(cfg) schedulers[schedulerTypeMoveTable] = newMoveTableScheduler() schedulers[schedulerTypeRebalance] = newRebalanceScheduler() diff --git a/cdc/scheduler/internal/tp/coordinator_test.go b/cdc/scheduler/internal/tp/coordinator_test.go index d8122df76d2..7af5503cad8 100644 --- a/cdc/scheduler/internal/tp/coordinator_test.go +++ b/cdc/scheduler/internal/tp/coordinator_test.go @@ -194,7 +194,8 @@ func TestCoordinatorHeartbeat(t *testing.T) { require.Nil(t, err) require.True(t, coord.captureM.CheckAllCaptureInitialized()) msgs = trans.sendBuffer - require.Len(t, msgs, 1) + require.Len(t, msgs, 2) + // Basic scheduler, make sure all tables get replicated. require.EqualValues(t, 3, msgs[0].DispatchTableRequest.GetAddTable().TableID) require.Len(t, coord.replicationM.tables, 3) } @@ -307,7 +308,7 @@ func benchmarkCoordinator( ) { log.SetLevel(zapcore.DPanicLevel) ctx := context.Background() - size := 16384 + size := 131072 // 2^17 for total := 1; total <= size; total *= 2 { name, coord, currentTables, captures := factory(total) b.ResetTimer() @@ -337,7 +338,7 @@ func BenchmarkCoordinatorInit(b *testing.B) { currentTables = append(currentTables, int64(10000+i)) } schedulers := make(map[schedulerType]scheduler) - schedulers[schedulerTypeBurstBalance] = newBurstBalanceScheduler() + schedulers[schedulerTypeBasic] = newBasicScheduler() coord = &coordinator{ trans: &mockTrans{}, schedulers: schedulers, @@ -377,7 +378,7 @@ func BenchmarkCoordinatorHeartbeat(b *testing.B) { currentTables = append(currentTables, int64(10000+i)) } schedulers := make(map[schedulerType]scheduler) - schedulers[schedulerTypeBurstBalance] = newBurstBalanceScheduler() + schedulers[schedulerTypeBasic] = newBasicScheduler() coord = &coordinator{ trans: &mockTrans{}, schedulers: schedulers, @@ -452,7 +453,7 @@ func BenchmarkCoordinatorHeartbeatResponse(b *testing.B) { keepRecvBuffer: true, } schedulers := make(map[schedulerType]scheduler) - schedulers[schedulerTypeBurstBalance] = newBurstBalanceScheduler() + schedulers[schedulerTypeBasic] = newBasicScheduler() coord = &coordinator{ trans: trans, schedulers: schedulers, diff --git a/cdc/scheduler/internal/tp/scheduler.go b/cdc/scheduler/internal/tp/scheduler.go index e8deaaaf6be..3fc18c4d8b8 100644 --- a/cdc/scheduler/internal/tp/scheduler.go +++ b/cdc/scheduler/internal/tp/scheduler.go @@ -30,7 +30,8 @@ type scheduler interface { type schedulerType string const ( - schedulerTypeBurstBalance schedulerType = "burst-balance-scheduler" - schedulerTypeMoveTable schedulerType = "move-table-scheduler" - schedulerTypeRebalance schedulerType = "rebalance-scheduler" + schedulerTypeBasic schedulerType = "basic-scheduler" + schedulerTypeBalance schedulerType = "balance-scheduler" + schedulerTypeMoveTable schedulerType = "move-table-scheduler" + schedulerTypeRebalance schedulerType = "rebalance-scheduler" ) diff --git a/cdc/scheduler/internal/tp/scheduler_balance.go b/cdc/scheduler/internal/tp/scheduler_balance.go index a777027cda5..144580f1308 100644 --- a/cdc/scheduler/internal/tp/scheduler_balance.go +++ b/cdc/scheduler/internal/tp/scheduler_balance.go @@ -17,46 +17,62 @@ import ( "math/rand" "time" - "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" - "go.uber.org/zap" + "github.com/pingcap/tiflow/pkg/config" ) -var _ scheduler = &burstBalanceScheduler{} +var _ scheduler = &balanceScheduler{} -type burstBalanceScheduler struct { - random *rand.Rand +// The scheduler for balancing tables of each captures. +type balanceScheduler struct { + random *rand.Rand + lastRebalanceTime time.Time + checkBalanceInterval time.Duration } -func newBurstBalanceScheduler() *burstBalanceScheduler { - return &burstBalanceScheduler{ - random: rand.New(rand.NewSource(time.Now().UnixNano())), +func newBalanceScheduler(cfg *config.SchedulerConfig) *balanceScheduler { + return &balanceScheduler{ + random: rand.New(rand.NewSource(time.Now().UnixNano())), + checkBalanceInterval: time.Duration(cfg.CheckBalanceInterval), } } -func (b *burstBalanceScheduler) Name() string { - return string(schedulerTypeBurstBalance) +func (b *balanceScheduler) Name() string { + return string(schedulerTypeBalance) } -func (b *burstBalanceScheduler) Schedule( +func (b *balanceScheduler) Schedule( checkpointTs model.Ts, currentTables []model.TableID, captures map[model.CaptureID]*model.CaptureInfo, replications map[model.TableID]*ReplicationSet, ) []*scheduleTask { - newTables := make([]model.TableID, 0) - intersectionTable := make(map[model.TableID]struct{}) + now := time.Now() + if now.Sub(b.lastRebalanceTime) < b.checkBalanceInterval { + return nil + } + b.lastRebalanceTime = now + + tasks := make([]*scheduleTask, 0) + task := buildBurstBalanceMoveTables(b.random, currentTables, captures, replications) + if task != nil { + tasks = append(tasks, task) + } + return tasks +} + +func buildBurstBalanceMoveTables( + random *rand.Rand, + currentTables []model.TableID, + captures map[model.CaptureID]*model.CaptureInfo, + replications map[model.TableID]*ReplicationSet, +) *scheduleTask { captureTables := make(map[model.CaptureID][]model.TableID) for _, tableID := range currentTables { rep, ok := replications[tableID] if !ok { - newTables = append(newTables, tableID) continue } - if rep.State == ReplicationSetStateAbsent { - newTables = append(newTables, tableID) - } - intersectionTable[tableID] = struct{}{} if rep.Primary != "" { captureTables[rep.Primary] = append(captureTables[rep.Primary], tableID) } @@ -64,88 +80,8 @@ func (b *burstBalanceScheduler) Schedule( captureTables[rep.Secondary] = append(captureTables[rep.Secondary], tableID) } } - rmTables := make([]model.TableID, 0) - for tableID := range replications { - _, ok := intersectionTable[tableID] - if !ok { - rmTables = append(rmTables, tableID) - } - } - captureIDs := make([]model.CaptureID, 0, len(captures)) - for captureID := range captures { - captureIDs = append(captureIDs, captureID) - } - // TODO support table re-balance when adding a new capture. - tasks := make([]*scheduleTask, 0) - if len(newTables) != 0 { - tasks = append( - tasks, newBurstBalanceAddTables(checkpointTs, newTables, captureIDs)) - if len(newTables) == len(currentTables) { - return tasks - } - } - if len(rmTables) > 0 { - tasks = append( - tasks, newBurstBalanceRemoveTables(rmTables, replications)) - } - - // Skip rebalance if there are some table added or removed. - if len(tasks) == 0 { - // We does not need accept callback here. - accept := (callback)(nil) - task := newBurstBalanceMoveTables(accept, b.random, currentTables, captures, replications) - if task != nil { - tasks = append(tasks, task) - } - } - - return tasks -} - -func newBurstBalanceAddTables( - checkpointTs model.Ts, newTables []model.TableID, captureIDs []model.CaptureID, -) *scheduleTask { - idx := 0 - tables := make([]addTable, 0, len(newTables)) - for _, tableID := range newTables { - tables = append(tables, addTable{ - TableID: tableID, - CaptureID: captureIDs[idx], - CheckpointTs: checkpointTs, - }) - idx++ - if idx >= len(captureIDs) { - idx = 0 - } - } - return &scheduleTask{burstBalance: &burstBalance{ - AddTables: tables, - }} -} - -func newBurstBalanceRemoveTables( - rmTables []model.TableID, replications map[model.TableID]*ReplicationSet, -) *scheduleTask { - tables := make([]removeTable, 0, len(rmTables)) - for _, tableID := range rmTables { - rep := replications[tableID] - var captureID model.CaptureID - if rep.Primary != "" { - captureID = rep.Primary - } else if rep.Secondary != "" { - captureID = rep.Secondary - } else { - log.Warn("tpscheduler: primary or secondary not found for removed table", - zap.Any("table", rep)) - continue - } - tables = append(tables, removeTable{ - TableID: tableID, - CaptureID: captureID, - }) - } - return &scheduleTask{burstBalance: &burstBalance{ - RemoveTables: tables, - }} + // We does not need accept callback here. + accept := (callback)(nil) + return newBurstBalanceMoveTables(accept, random, captures, replications) } diff --git a/cdc/scheduler/internal/tp/scheduler_balance_test.go b/cdc/scheduler/internal/tp/scheduler_balance_test.go index cc3b6249a32..2d680de1eab 100644 --- a/cdc/scheduler/internal/tp/scheduler_balance_test.go +++ b/cdc/scheduler/internal/tp/scheduler_balance_test.go @@ -14,102 +14,44 @@ package tp import ( - "fmt" "testing" + "time" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" "github.com/stretchr/testify/require" ) -func TestSchedulerBalance(t *testing.T) { +func TestSchedulerBalanceCaptureOnline(t *testing.T) { t.Parallel() - captures := map[model.CaptureID]*model.CaptureInfo{"a": {}, "b": {}} - currentTables := []model.TableID{1, 2, 3, 4} - - // AddTable only - replications := map[model.TableID]*ReplicationSet{} - b := newBurstBalanceScheduler() - tasks := b.Schedule(0, currentTables, captures, replications) - require.Len(t, tasks, 1) - require.Len(t, tasks[0].burstBalance.AddTables, 4) - require.Equal(t, tasks[0].burstBalance.AddTables[0].TableID, model.TableID(1)) - require.Equal(t, tasks[0].burstBalance.AddTables[1].TableID, model.TableID(2)) - require.Equal(t, tasks[0].burstBalance.AddTables[2].TableID, model.TableID(3)) - require.Equal(t, tasks[0].burstBalance.AddTables[3].TableID, model.TableID(4)) - - // AddTable ReplicationSetStateAbsent. - replications = map[model.TableID]*ReplicationSet{ - 1: {State: ReplicationSetStateReplicating, Primary: "a"}, - 2: {State: ReplicationSetStateCommit, Secondary: "b"}, - 3: {State: ReplicationSetStatePrepare, Primary: "a", Secondary: "b"}, - 4: {State: ReplicationSetStateAbsent}, - } - tasks = b.Schedule(1, currentTables, captures, replications) - require.Len(t, tasks, 1) - require.Equal(t, tasks[0].burstBalance.AddTables[0].TableID, model.TableID(4)) - require.Equal(t, tasks[0].burstBalance.AddTables[0].CheckpointTs, model.Ts(1)) - - // AddTable 4, and RemoveTable 5. - replications = map[model.TableID]*ReplicationSet{ - 1: {State: ReplicationSetStateReplicating, Primary: "a"}, - 2: {State: ReplicationSetStateCommit, Secondary: "b"}, - 3: {State: ReplicationSetStatePrepare, Primary: "a", Secondary: "b"}, - 5: {State: ReplicationSetStateCommit, Primary: "a", Secondary: "b"}, - } - tasks = b.Schedule(2, currentTables, captures, replications) - require.Len(t, tasks, 2) - if tasks[0].burstBalance.AddTables != nil { - require.Equal(t, tasks[0].burstBalance.AddTables[0].TableID, model.TableID(4)) - require.Equal(t, tasks[0].burstBalance.AddTables[0].CheckpointTs, model.Ts(2)) - require.Equal(t, tasks[1].burstBalance.RemoveTables[0].TableID, model.TableID(5)) - } else { - require.Equal(t, tasks[1].burstBalance.AddTables[0].TableID, model.TableID(4)) - require.Equal(t, tasks[0].burstBalance.AddTables[0].CheckpointTs, model.Ts(2)) - require.Equal(t, tasks[0].burstBalance.RemoveTables[0].TableID, model.TableID(5)) - } - - // RemoveTable only. - replications = map[model.TableID]*ReplicationSet{ - 1: {State: ReplicationSetStateReplicating, Primary: "a"}, - 2: {State: ReplicationSetStateCommit, Secondary: "b"}, - 3: {State: ReplicationSetStatePrepare, Primary: "a", Secondary: "b"}, - 4: {State: ReplicationSetStatePrepare, Primary: "a", Secondary: "b"}, - 5: {State: ReplicationSetStatePrepare, Secondary: "b"}, - } - tasks = b.Schedule(3, currentTables, captures, replications) - require.Len(t, tasks, 1) - require.Equal(t, tasks[0].burstBalance.RemoveTables[0].TableID, model.TableID(5)) -} - -func TestSchedulerBalanceCaptureChange(t *testing.T) { - t.Parallel() - - b := newBurstBalanceScheduler() + sched := newBalanceScheduler(&config.SchedulerConfig{ + CheckBalanceInterval: config.TomlDuration(time.Duration(0)), + }) + sched.random = nil - // Capture "b" offline - captures := map[model.CaptureID]*model.CaptureInfo{"a": {}} - currentTables := []model.TableID{1, 2, 3} + // New capture "b" online + captures := map[model.CaptureID]*model.CaptureInfo{"a": {}, "b": {}} + currentTables := []model.TableID{1, 2} replications := map[model.TableID]*ReplicationSet{ 1: {State: ReplicationSetStateReplicating, Primary: "a"}, - 2: {State: ReplicationSetStateCommit, Secondary: "b"}, - 3: {State: ReplicationSetStatePrepare, Primary: "a", Secondary: "b"}, + 2: {State: ReplicationSetStateReplicating, Primary: "a"}, } - tasks := b.Schedule(0, currentTables, captures, replications) - require.Len(t, tasks, 0) + tasks := sched.Schedule(0, currentTables, captures, replications) + require.Len(t, tasks, 1) + require.Len(t, tasks[0].burstBalance.MoveTables, 1) + require.Equal(t, tasks[0].burstBalance.MoveTables[0].TableID, model.TableID(1)) - // Capture "b" online - b.random = nil + // New capture "b" online, but this time it not pass check balance interval. + sched.checkBalanceInterval = time.Hour captures = map[model.CaptureID]*model.CaptureInfo{"a": {}, "b": {}} currentTables = []model.TableID{1, 2} replications = map[model.TableID]*ReplicationSet{ 1: {State: ReplicationSetStateReplicating, Primary: "a"}, 2: {State: ReplicationSetStateReplicating, Primary: "a"}, } - tasks = b.Schedule(0, currentTables, captures, replications) - require.Len(t, tasks, 1) - require.Len(t, tasks[0].burstBalance.MoveTables, 1) - require.Equal(t, tasks[0].burstBalance.MoveTables[0].TableID, model.TableID(1)) + tasks = sched.Schedule(0, currentTables, captures, replications) + require.Len(t, tasks, 0) // TODO revise balance algorithm and enable the test case. // @@ -127,99 +69,3 @@ func TestSchedulerBalanceCaptureChange(t *testing.T) { // require.Len(t, tasks[0].burstBalance.MoveTables, 1) // require.Equal(t, tasks[0].burstBalance.MoveTables[0].TableID, model.TableID(1)) } - -func benchmarkSchedulerBalance( - b *testing.B, - factory func(total int) ( - name string, - currentTables []model.TableID, - captures map[model.CaptureID]*model.CaptureInfo, - replications map[model.TableID]*ReplicationSet, - ), -) { - size := 16384 - for total := 1; total <= size; total *= 2 { - name, currentTables, captures, replications := factory(total) - bal := newBurstBalanceScheduler() - b.ResetTimer() - b.Run(name, func(b *testing.B) { - for i := 0; i < b.N; i++ { - bal.Schedule(0, currentTables, captures, replications) - } - }) - b.StopTimer() - } -} - -func BenchmarkSchedulerBalanceAddTables(b *testing.B) { - benchmarkSchedulerBalance(b, func(total int) ( - name string, - currentTables []model.TableID, - captures map[model.CaptureID]*model.CaptureInfo, - replications map[model.TableID]*ReplicationSet, - ) { - const captureCount = 8 - captures = map[model.CaptureID]*model.CaptureInfo{} - for i := 0; i < captureCount; i++ { - captures[fmt.Sprint(i)] = &model.CaptureInfo{} - } - currentTables = make([]model.TableID, 0, total) - for i := 0; i < total; i++ { - currentTables = append(currentTables, int64(10000+i)) - } - replications = map[model.TableID]*ReplicationSet{} - name = fmt.Sprintf("AddTable %d", total) - return name, currentTables, captures, replications - }) -} - -func BenchmarkSchedulerBalanceRemoveTables(b *testing.B) { - benchmarkSchedulerBalance(b, func(total int) ( - name string, - currentTables []model.TableID, - captures map[model.CaptureID]*model.CaptureInfo, - replications map[model.TableID]*ReplicationSet, - ) { - const captureCount = 8 - captures = map[model.CaptureID]*model.CaptureInfo{} - for i := 0; i < captureCount; i++ { - captures[fmt.Sprint(i)] = &model.CaptureInfo{} - } - currentTables = make([]model.TableID, 0, total) - replications = map[model.TableID]*ReplicationSet{} - for i := 0; i < total; i++ { - replications[int64(10000+i)] = &ReplicationSet{ - Primary: fmt.Sprint(i % captureCount), - } - } - name = fmt.Sprintf("RemoveTable %d", total) - return name, currentTables, captures, replications - }) -} - -func BenchmarkSchedulerBalanceAddRemoveTables(b *testing.B) { - benchmarkSchedulerBalance(b, func(total int) ( - name string, - currentTables []model.TableID, - captures map[model.CaptureID]*model.CaptureInfo, - replications map[model.TableID]*ReplicationSet, - ) { - const captureCount = 8 - captures = map[model.CaptureID]*model.CaptureInfo{} - for i := 0; i < captureCount; i++ { - captures[fmt.Sprint(i)] = &model.CaptureInfo{} - } - currentTables = make([]model.TableID, 0, total) - for i := 0; i < total/2; i++ { - currentTables = append(currentTables, int64(100000+i)) - } - replications = map[model.TableID]*ReplicationSet{} - for i := 0; i < total/2; i++ { - replications[int64(200000+i)] = &ReplicationSet{ - Primary: fmt.Sprint(i % captureCount), - } - } - name = fmt.Sprintf("AddRemoveTable %d", total) - return name, currentTables, captures, replications - }) -} diff --git a/cdc/scheduler/internal/tp/scheduler_basic.go b/cdc/scheduler/internal/tp/scheduler_basic.go new file mode 100644 index 00000000000..cbedcf2cace --- /dev/null +++ b/cdc/scheduler/internal/tp/scheduler_basic.go @@ -0,0 +1,163 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tp + +import ( + "math/rand" + "time" + + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "go.uber.org/zap" +) + +var _ scheduler = &basicScheduler{} + +// The basic scheduler for adding and removing tables, it tries to keep +// every table get replicated. +// +// It handles the following scenario: +// 1. Initial table dispatch. +// 2. DDL CREATE/DROP/TRUNCATE TABLE +// 3. Capture offline. +type basicScheduler struct { + random *rand.Rand + lastRebalanceTime time.Time + checkBalanceInterval time.Duration +} + +func newBasicScheduler() *basicScheduler { + return &basicScheduler{ + random: rand.New(rand.NewSource(time.Now().UnixNano())), + } +} + +func (b *basicScheduler) Name() string { + return string(schedulerTypeBasic) +} + +func (b *basicScheduler) Schedule( + checkpointTs model.Ts, + currentTables []model.TableID, + captures map[model.CaptureID]*model.CaptureInfo, + replications map[model.TableID]*ReplicationSet, +) []*scheduleTask { + tasks := make([]*scheduleTask, 0) + tablesLenEqual := len(currentTables) == len(replications) + tablesAllFind := true + newTables := make([]model.TableID, 0) + for _, tableID := range currentTables { + rep, ok := replications[tableID] + if !ok { + newTables = append(newTables, tableID) + // The table ID is not in the replication means the two sets are + // not identical. + tablesAllFind = false + continue + } + if rep.State == ReplicationSetStateAbsent { + newTables = append(newTables, tableID) + } + } + + // Build add table tasks. + if len(newTables) > 0 { + captureIDs := make([]model.CaptureID, 0, len(captures)) + for captureID := range captures { + captureIDs = append(captureIDs, captureID) + } + tasks = append( + tasks, newBurstBalanceAddTables(checkpointTs, newTables, captureIDs)) + if len(newTables) == len(currentTables) { + // The initial balance, if new tables and current tables are equal. + return tasks + } + } + + // Build remove table tasks. + // For most of the time, remove tables are unlikely to happen. + // + // Fast path for check whether two sets are identical: + // If the length of currentTables and replications are equal, + // and for all tables in currentTables have a record in replications. + if !tablesLenEqual || !tablesAllFind { + // The two sets are not identical. We need to find removed tables. + intersectionTable := make(map[model.TableID]struct{}, len(currentTables)) + for _, tableID := range currentTables { + _, ok := replications[tableID] + if !ok { + continue + } + intersectionTable[tableID] = struct{}{} + } + rmTables := make([]model.TableID, 0) + for tableID := range replications { + _, ok := intersectionTable[tableID] + if !ok { + rmTables = append(rmTables, tableID) + } + } + if len(rmTables) > 0 { + tasks = append(tasks, newBurstBalanceRemoveTables(rmTables, replications)) + } + } + return tasks +} + +func newBurstBalanceAddTables( + checkpointTs model.Ts, newTables []model.TableID, captureIDs []model.CaptureID, +) *scheduleTask { + idx := 0 + tables := make([]addTable, 0, len(newTables)) + for _, tableID := range newTables { + tables = append(tables, addTable{ + TableID: tableID, + CaptureID: captureIDs[idx], + CheckpointTs: checkpointTs, + }) + idx++ + if idx >= len(captureIDs) { + idx = 0 + } + } + return &scheduleTask{burstBalance: &burstBalance{ + AddTables: tables, + }} +} + +func newBurstBalanceRemoveTables( + rmTables []model.TableID, replications map[model.TableID]*ReplicationSet, +) *scheduleTask { + tables := make([]removeTable, 0, len(rmTables)) + for _, tableID := range rmTables { + rep := replications[tableID] + var captureID model.CaptureID + if rep.Primary != "" { + captureID = rep.Primary + } else if rep.Secondary != "" { + captureID = rep.Secondary + } else { + log.Warn("tpscheduler: primary or secondary not found for removed table", + zap.Any("table", rep)) + continue + } + tables = append(tables, removeTable{ + TableID: tableID, + CaptureID: captureID, + }) + } + return &scheduleTask{burstBalance: &burstBalance{ + RemoveTables: tables, + }} +} diff --git a/cdc/scheduler/internal/tp/scheduler_basic_test.go b/cdc/scheduler/internal/tp/scheduler_basic_test.go new file mode 100644 index 00000000000..463096cd127 --- /dev/null +++ b/cdc/scheduler/internal/tp/scheduler_basic_test.go @@ -0,0 +1,188 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tp + +import ( + "fmt" + "testing" + + "github.com/pingcap/tiflow/cdc/model" + "github.com/stretchr/testify/require" +) + +func TestSchedulerBasic(t *testing.T) { + t.Parallel() + + captures := map[model.CaptureID]*model.CaptureInfo{"a": {}, "b": {}} + currentTables := []model.TableID{1, 2, 3, 4} + + // Initial table dispatch. + // AddTable only + replications := map[model.TableID]*ReplicationSet{} + b := newBasicScheduler() + tasks := b.Schedule(0, currentTables, captures, replications) + require.Len(t, tasks, 1) + require.Len(t, tasks[0].burstBalance.AddTables, 4) + require.Equal(t, tasks[0].burstBalance.AddTables[0].TableID, model.TableID(1)) + require.Equal(t, tasks[0].burstBalance.AddTables[1].TableID, model.TableID(2)) + require.Equal(t, tasks[0].burstBalance.AddTables[2].TableID, model.TableID(3)) + require.Equal(t, tasks[0].burstBalance.AddTables[3].TableID, model.TableID(4)) + + // Capture offline, causes ReplicationSetStateAbsent. + // AddTable only. + replications = map[model.TableID]*ReplicationSet{ + 1: {State: ReplicationSetStateReplicating, Primary: "a"}, + 2: {State: ReplicationSetStateCommit, Secondary: "b"}, + 3: {State: ReplicationSetStatePrepare, Primary: "a", Secondary: "b"}, + 4: {State: ReplicationSetStateAbsent}, + } + tasks = b.Schedule(1, currentTables, captures, replications) + require.Len(t, tasks, 1) + require.Equal(t, tasks[0].burstBalance.AddTables[0].TableID, model.TableID(4)) + require.Equal(t, tasks[0].burstBalance.AddTables[0].CheckpointTs, model.Ts(1)) + + // DDL CREATE/DROP/TRUNCATE TABLE. + // AddTable 4, and RemoveTable 5. + replications = map[model.TableID]*ReplicationSet{ + 1: {State: ReplicationSetStateReplicating, Primary: "a"}, + 2: {State: ReplicationSetStateCommit, Secondary: "b"}, + 3: {State: ReplicationSetStatePrepare, Primary: "a", Secondary: "b"}, + 5: {State: ReplicationSetStateCommit, Primary: "a", Secondary: "b"}, + } + tasks = b.Schedule(2, currentTables, captures, replications) + require.Len(t, tasks, 2) + if tasks[0].burstBalance.AddTables != nil { + require.Equal(t, tasks[0].burstBalance.AddTables[0].TableID, model.TableID(4)) + require.Equal(t, tasks[0].burstBalance.AddTables[0].CheckpointTs, model.Ts(2)) + require.Equal(t, tasks[1].burstBalance.RemoveTables[0].TableID, model.TableID(5)) + } else { + require.Equal(t, tasks[1].burstBalance.AddTables[0].TableID, model.TableID(4)) + require.Equal(t, tasks[0].burstBalance.AddTables[0].CheckpointTs, model.Ts(2)) + require.Equal(t, tasks[0].burstBalance.RemoveTables[0].TableID, model.TableID(5)) + } + + // RemoveTable only. + replications = map[model.TableID]*ReplicationSet{ + 1: {State: ReplicationSetStateReplicating, Primary: "a"}, + 2: {State: ReplicationSetStateCommit, Secondary: "b"}, + 3: {State: ReplicationSetStatePrepare, Primary: "a", Secondary: "b"}, + 4: {State: ReplicationSetStatePrepare, Primary: "a", Secondary: "b"}, + 5: {State: ReplicationSetStatePrepare, Secondary: "b"}, + } + tasks = b.Schedule(3, currentTables, captures, replications) + require.Len(t, tasks, 1) + require.Equal(t, tasks[0].burstBalance.RemoveTables[0].TableID, model.TableID(5)) +} + +func benchmarkSchedulerBalance( + b *testing.B, + factory func(total int) ( + name string, + currentTables []model.TableID, + captures map[model.CaptureID]*model.CaptureInfo, + replications map[model.TableID]*ReplicationSet, + sched scheduler, + ), +) { + size := 16384 + for total := 1; total <= size; total *= 2 { + name, currentTables, captures, replications, sched := factory(total) + b.ResetTimer() + b.Run(name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + sched.Schedule(0, currentTables, captures, replications) + } + }) + b.StopTimer() + } +} + +func BenchmarkSchedulerBasicAddTables(b *testing.B) { + benchmarkSchedulerBalance(b, func(total int) ( + name string, + currentTables []model.TableID, + captures map[model.CaptureID]*model.CaptureInfo, + replications map[model.TableID]*ReplicationSet, + sched scheduler, + ) { + const captureCount = 8 + captures = map[model.CaptureID]*model.CaptureInfo{} + for i := 0; i < captureCount; i++ { + captures[fmt.Sprint(i)] = &model.CaptureInfo{} + } + currentTables = make([]model.TableID, 0, total) + for i := 0; i < total; i++ { + currentTables = append(currentTables, int64(10000+i)) + } + replications = map[model.TableID]*ReplicationSet{} + name = fmt.Sprintf("AddTable %d", total) + sched = newBasicScheduler() + return name, currentTables, captures, replications, sched + }) +} + +func BenchmarkSchedulerBasicRemoveTables(b *testing.B) { + benchmarkSchedulerBalance(b, func(total int) ( + name string, + currentTables []model.TableID, + captures map[model.CaptureID]*model.CaptureInfo, + replications map[model.TableID]*ReplicationSet, + sched scheduler, + ) { + const captureCount = 8 + captures = map[model.CaptureID]*model.CaptureInfo{} + for i := 0; i < captureCount; i++ { + captures[fmt.Sprint(i)] = &model.CaptureInfo{} + } + currentTables = make([]model.TableID, 0, total) + replications = map[model.TableID]*ReplicationSet{} + for i := 0; i < total; i++ { + replications[int64(10000+i)] = &ReplicationSet{ + Primary: fmt.Sprint(i % captureCount), + } + } + name = fmt.Sprintf("RemoveTable %d", total) + sched = newBasicScheduler() + return name, currentTables, captures, replications, sched + }) +} + +func BenchmarkSchedulerBasicAddRemoveTables(b *testing.B) { + benchmarkSchedulerBalance(b, func(total int) ( + name string, + currentTables []model.TableID, + captures map[model.CaptureID]*model.CaptureInfo, + replications map[model.TableID]*ReplicationSet, + sched scheduler, + ) { + const captureCount = 8 + captures = map[model.CaptureID]*model.CaptureInfo{} + for i := 0; i < captureCount; i++ { + captures[fmt.Sprint(i)] = &model.CaptureInfo{} + } + currentTables = make([]model.TableID, 0, total) + for i := 0; i < total/2; i++ { + currentTables = append(currentTables, int64(100000+i)) + } + replications = map[model.TableID]*ReplicationSet{} + for i := 0; i < total/2; i++ { + replications[int64(200000+i)] = &ReplicationSet{ + Primary: fmt.Sprint(i % captureCount), + } + } + name = fmt.Sprintf("AddRemoveTable %d", total) + sched = newBasicScheduler() + return name, currentTables, captures, replications, sched + }) +} diff --git a/cdc/scheduler/internal/tp/scheduler_rebalance.go b/cdc/scheduler/internal/tp/scheduler_rebalance.go index 5eba95d0724..6b3bdd9eb0e 100644 --- a/cdc/scheduler/internal/tp/scheduler_rebalance.go +++ b/cdc/scheduler/internal/tp/scheduler_rebalance.go @@ -73,7 +73,7 @@ func (r *rebalanceScheduler) Schedule( accept := func() { atomic.StoreInt32(&r.rebalance, 0) } - task := newBurstBalanceMoveTables(accept, r.random, currentTables, captures, replications) + task := newBurstBalanceMoveTables(accept, r.random, captures, replications) if task == nil { return nil } @@ -83,7 +83,6 @@ func (r *rebalanceScheduler) Schedule( func newBurstBalanceMoveTables( accept callback, random *rand.Rand, - currentTables []model.TableID, captures map[model.CaptureID]*model.CaptureInfo, replications map[model.TableID]*ReplicationSet, ) *scheduleTask { @@ -93,8 +92,8 @@ func newBurstBalanceMoveTables( } for tableID, rep := range replications { - if rep.Primary == "" { - log.Info("tpscheduler: rebalance found table no primary, skip", + if rep.State != ReplicationSetStateReplicating { + log.Info("tpscheduler: rebalance skip tables that are not in Replicating", zap.Int64("tableID", tableID), zap.Any("replication", rep)) continue diff --git a/pkg/cmd/server/server_test.go b/pkg/cmd/server/server_test.go index 7ae232baf19..cacf54ff171 100644 --- a/pkg/cmd/server/server_test.go +++ b/pkg/cmd/server/server_test.go @@ -210,8 +210,9 @@ func TestParseCfg(t *testing.T) { }, EnableTwoPhaseScheduler: false, Scheduler: &config.SchedulerConfig{ - HeartbeatTick: 2, - MaxTaskConcurrency: 10, + HeartbeatTick: 2, + MaxTaskConcurrency: 10, + CheckBalanceInterval: 60000000000, }, }, }, o.serverConfig) @@ -281,6 +282,7 @@ server-worker-pool-size = 16 [debug.scheduler] heartbeat-tick = 3 max-task-concurrency = 11 +check-balance-interval = "10s" `, dataDir) err := os.WriteFile(configPath, []byte(configContent), 0o644) require.Nil(t, err) @@ -364,8 +366,9 @@ max-task-concurrency = 11 }, EnableTwoPhaseScheduler: true, Scheduler: &config.SchedulerConfig{ - HeartbeatTick: 3, - MaxTaskConcurrency: 11, + HeartbeatTick: 3, + MaxTaskConcurrency: 11, + CheckBalanceInterval: config.TomlDuration(10 * time.Second), }, }, }, o.serverConfig) @@ -511,8 +514,9 @@ cert-allowed-cn = ["dd","ee"] }, EnableTwoPhaseScheduler: false, Scheduler: &config.SchedulerConfig{ - HeartbeatTick: 2, - MaxTaskConcurrency: 10, + HeartbeatTick: 2, + MaxTaskConcurrency: 10, + CheckBalanceInterval: 60000000000, }, }, }, o.serverConfig) @@ -575,8 +579,9 @@ unknown3 = 3 }, EnableTwoPhaseScheduler: false, Scheduler: &config.SchedulerConfig{ - HeartbeatTick: 2, - MaxTaskConcurrency: 10, + HeartbeatTick: 2, + MaxTaskConcurrency: 10, + CheckBalanceInterval: 60000000000, }, }, o.serverConfig.Debug) } diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index f2d221418b9..32518ffd5ec 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -132,7 +132,8 @@ const ( "enable-2phase-scheduler": false, "scheduler": { "heartbeat-tick": 2, - "max-task-concurrency": 10 + "max-task-concurrency": 10, + "check-balance-interval": 60000000000 } } }` diff --git a/pkg/config/scheduler_config.go b/pkg/config/scheduler_config.go index 104876586bb..756aad51177 100644 --- a/pkg/config/scheduler_config.go +++ b/pkg/config/scheduler_config.go @@ -14,22 +14,31 @@ package config import ( + "time" + cerrors "github.com/pingcap/tiflow/pkg/errors" ) // SchedulerConfig configs TiCDC scheduler. type SchedulerConfig struct { - HeartbeatTick int `toml:"heartbeat-tick" json:"heartbeat-tick"` - MaxTaskConcurrency int `toml:"max-task-concurrency" json:"max-task-concurrency"` + HeartbeatTick int `toml:"heartbeat-tick" json:"heartbeat-tick"` + MaxTaskConcurrency int `toml:"max-task-concurrency" json:"max-task-concurrency"` + CheckBalanceInterval TomlDuration `toml:"check-balance-interval" json:"check-balance-interval"` } // ValidateAndAdjust verifies that each parameter is valid. func (c *SchedulerConfig) ValidateAndAdjust() error { if c.HeartbeatTick <= 0 { - return cerrors.ErrInvalidServerOption.GenWithStackByArgs("heartbeat-tick must be larger than 0") + return cerrors.ErrInvalidServerOption.GenWithStackByArgs( + "heartbeat-tick must be larger than 0") } if c.MaxTaskConcurrency <= 0 { - return cerrors.ErrInvalidServerOption.GenWithStackByArgs("max-task-concurrency must be larger than 0") + return cerrors.ErrInvalidServerOption.GenWithStackByArgs( + "max-task-concurrency must be larger than 0") + } + if time.Duration(c.CheckBalanceInterval) <= time.Second { + return cerrors.ErrInvalidServerOption.GenWithStackByArgs( + "check-balance-interval must be larger than 1s") } return nil } diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index 511d5ddb563..c2b309e3a2e 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -133,8 +133,9 @@ var defaultServerConfig = &ServerConfig{ EnableTwoPhaseScheduler: false, Scheduler: &SchedulerConfig{ - HeartbeatTick: 2, - MaxTaskConcurrency: 10, + HeartbeatTick: 2, + MaxTaskConcurrency: 10, + CheckBalanceInterval: TomlDuration(time.Minute), }, }, } diff --git a/pkg/config/server_config_test.go b/pkg/config/server_config_test.go index aadcdca1330..f6a2a31ced4 100644 --- a/pkg/config/server_config_test.go +++ b/pkg/config/server_config_test.go @@ -119,4 +119,9 @@ func TestSchedulerConfigValidateAndAdjust(t *testing.T) { require.Error(t, conf.ValidateAndAdjust()) conf.MaxTaskConcurrency = 0 require.Error(t, conf.ValidateAndAdjust()) + + conf.CheckBalanceInterval = -1 + require.Error(t, conf.ValidateAndAdjust()) + conf.CheckBalanceInterval = TomlDuration(time.Second) + require.Error(t, conf.ValidateAndAdjust()) }