Skip to content

Commit

Permalink
tp: separate balance scheduler to basic and balance (pingcap#5842)
Browse files Browse the repository at this point in the history
* tp: clean up stale capture table metrics

Signed-off-by: Neil Shen <overvenus@gmail.com>

* tp: refine balance scheduler

Signed-off-by: Neil Shen <overvenus@gmail.com>

* tp: seperate balance scheduler to basic and balance

Signed-off-by: Neil Shen <overvenus@gmail.com>

* fix tests

Signed-off-by: Neil Shen <overvenus@gmail.com>

* Apply suggestions from code review

Co-authored-by: Ling Jin <7138436+3AceShowHand@users.noreply.github.com>

* address comments

Signed-off-by: Neil Shen <overvenus@gmail.com>

Co-authored-by: Ling Jin <7138436+3AceShowHand@users.noreply.github.com>
  • Loading branch information
overvenus and 3AceShowHand committed Jun 14, 2022
1 parent 404269b commit 5ce736d
Show file tree
Hide file tree
Showing 14 changed files with 461 additions and 302 deletions.
3 changes: 3 additions & 0 deletions cdc/scheduler/internal/tp/capture_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ func (c *captureManager) HandleAliveCaptureUpdate(
c.changes.Removed = make(map[string][]schedulepb.TableStatus)
}
c.changes.Removed[id] = capture.Tables

cf := c.changefeedID
captureTableGauge.DeleteLabelValues(cf.Namespace, cf.ID, capture.Addr)
}
}

Expand Down
3 changes: 2 additions & 1 deletion cdc/scheduler/internal/tp/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ func newCoordinator(
) *coordinator {
revision := schedulepb.OwnerRevision{Revision: ownerRevision}
schedulers := make(map[schedulerType]scheduler)
schedulers[schedulerTypeBurstBalance] = newBurstBalanceScheduler()
schedulers[schedulerTypeBasic] = newBasicScheduler()
schedulers[schedulerTypeBalance] = newBalanceScheduler(cfg)
schedulers[schedulerTypeMoveTable] = newMoveTableScheduler()
schedulers[schedulerTypeRebalance] = newRebalanceScheduler()

Expand Down
11 changes: 6 additions & 5 deletions cdc/scheduler/internal/tp/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ func TestCoordinatorHeartbeat(t *testing.T) {
require.Nil(t, err)
require.True(t, coord.captureM.CheckAllCaptureInitialized())
msgs = trans.sendBuffer
require.Len(t, msgs, 1)
require.Len(t, msgs, 2)
// Basic scheduler, make sure all tables get replicated.
require.EqualValues(t, 3, msgs[0].DispatchTableRequest.GetAddTable().TableID)
require.Len(t, coord.replicationM.tables, 3)
}
Expand Down Expand Up @@ -307,7 +308,7 @@ func benchmarkCoordinator(
) {
log.SetLevel(zapcore.DPanicLevel)
ctx := context.Background()
size := 16384
size := 131072 // 2^17
for total := 1; total <= size; total *= 2 {
name, coord, currentTables, captures := factory(total)
b.ResetTimer()
Expand Down Expand Up @@ -337,7 +338,7 @@ func BenchmarkCoordinatorInit(b *testing.B) {
currentTables = append(currentTables, int64(10000+i))
}
schedulers := make(map[schedulerType]scheduler)
schedulers[schedulerTypeBurstBalance] = newBurstBalanceScheduler()
schedulers[schedulerTypeBasic] = newBasicScheduler()
coord = &coordinator{
trans: &mockTrans{},
schedulers: schedulers,
Expand Down Expand Up @@ -377,7 +378,7 @@ func BenchmarkCoordinatorHeartbeat(b *testing.B) {
currentTables = append(currentTables, int64(10000+i))
}
schedulers := make(map[schedulerType]scheduler)
schedulers[schedulerTypeBurstBalance] = newBurstBalanceScheduler()
schedulers[schedulerTypeBasic] = newBasicScheduler()
coord = &coordinator{
trans: &mockTrans{},
schedulers: schedulers,
Expand Down Expand Up @@ -452,7 +453,7 @@ func BenchmarkCoordinatorHeartbeatResponse(b *testing.B) {
keepRecvBuffer: true,
}
schedulers := make(map[schedulerType]scheduler)
schedulers[schedulerTypeBurstBalance] = newBurstBalanceScheduler()
schedulers[schedulerTypeBasic] = newBasicScheduler()
coord = &coordinator{
trans: trans,
schedulers: schedulers,
Expand Down
7 changes: 4 additions & 3 deletions cdc/scheduler/internal/tp/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ type scheduler interface {
type schedulerType string

const (
schedulerTypeBurstBalance schedulerType = "burst-balance-scheduler"
schedulerTypeMoveTable schedulerType = "move-table-scheduler"
schedulerTypeRebalance schedulerType = "rebalance-scheduler"
schedulerTypeBasic schedulerType = "basic-scheduler"
schedulerTypeBalance schedulerType = "balance-scheduler"
schedulerTypeMoveTable schedulerType = "move-table-scheduler"
schedulerTypeRebalance schedulerType = "rebalance-scheduler"
)
138 changes: 37 additions & 101 deletions cdc/scheduler/internal/tp/scheduler_balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,135 +17,71 @@ import (
"math/rand"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"go.uber.org/zap"
"github.com/pingcap/tiflow/pkg/config"
)

var _ scheduler = &burstBalanceScheduler{}
var _ scheduler = &balanceScheduler{}

type burstBalanceScheduler struct {
random *rand.Rand
// The scheduler for balancing tables of each captures.
type balanceScheduler struct {
random *rand.Rand
lastRebalanceTime time.Time
checkBalanceInterval time.Duration
}

func newBurstBalanceScheduler() *burstBalanceScheduler {
return &burstBalanceScheduler{
random: rand.New(rand.NewSource(time.Now().UnixNano())),
func newBalanceScheduler(cfg *config.SchedulerConfig) *balanceScheduler {
return &balanceScheduler{
random: rand.New(rand.NewSource(time.Now().UnixNano())),
checkBalanceInterval: time.Duration(cfg.CheckBalanceInterval),
}
}

func (b *burstBalanceScheduler) Name() string {
return string(schedulerTypeBurstBalance)
func (b *balanceScheduler) Name() string {
return string(schedulerTypeBalance)
}

func (b *burstBalanceScheduler) Schedule(
func (b *balanceScheduler) Schedule(
checkpointTs model.Ts,
currentTables []model.TableID,
captures map[model.CaptureID]*model.CaptureInfo,
replications map[model.TableID]*ReplicationSet,
) []*scheduleTask {
newTables := make([]model.TableID, 0)
intersectionTable := make(map[model.TableID]struct{})
now := time.Now()
if now.Sub(b.lastRebalanceTime) < b.checkBalanceInterval {
return nil
}
b.lastRebalanceTime = now

tasks := make([]*scheduleTask, 0)
task := buildBurstBalanceMoveTables(b.random, currentTables, captures, replications)
if task != nil {
tasks = append(tasks, task)
}
return tasks
}

func buildBurstBalanceMoveTables(
random *rand.Rand,
currentTables []model.TableID,
captures map[model.CaptureID]*model.CaptureInfo,
replications map[model.TableID]*ReplicationSet,
) *scheduleTask {
captureTables := make(map[model.CaptureID][]model.TableID)
for _, tableID := range currentTables {
rep, ok := replications[tableID]
if !ok {
newTables = append(newTables, tableID)
continue
}
if rep.State == ReplicationSetStateAbsent {
newTables = append(newTables, tableID)
}
intersectionTable[tableID] = struct{}{}
if rep.Primary != "" {
captureTables[rep.Primary] = append(captureTables[rep.Primary], tableID)
}
if rep.Secondary != "" {
captureTables[rep.Secondary] = append(captureTables[rep.Secondary], tableID)
}
}
rmTables := make([]model.TableID, 0)
for tableID := range replications {
_, ok := intersectionTable[tableID]
if !ok {
rmTables = append(rmTables, tableID)
}
}

captureIDs := make([]model.CaptureID, 0, len(captures))
for captureID := range captures {
captureIDs = append(captureIDs, captureID)
}
// TODO support table re-balance when adding a new capture.
tasks := make([]*scheduleTask, 0)
if len(newTables) != 0 {
tasks = append(
tasks, newBurstBalanceAddTables(checkpointTs, newTables, captureIDs))
if len(newTables) == len(currentTables) {
return tasks
}
}
if len(rmTables) > 0 {
tasks = append(
tasks, newBurstBalanceRemoveTables(rmTables, replications))
}

// Skip rebalance if there are some table added or removed.
if len(tasks) == 0 {
// We does not need accept callback here.
accept := (callback)(nil)
task := newBurstBalanceMoveTables(accept, b.random, currentTables, captures, replications)
if task != nil {
tasks = append(tasks, task)
}
}

return tasks
}

func newBurstBalanceAddTables(
checkpointTs model.Ts, newTables []model.TableID, captureIDs []model.CaptureID,
) *scheduleTask {
idx := 0
tables := make([]addTable, 0, len(newTables))
for _, tableID := range newTables {
tables = append(tables, addTable{
TableID: tableID,
CaptureID: captureIDs[idx],
CheckpointTs: checkpointTs,
})
idx++
if idx >= len(captureIDs) {
idx = 0
}
}
return &scheduleTask{burstBalance: &burstBalance{
AddTables: tables,
}}
}

func newBurstBalanceRemoveTables(
rmTables []model.TableID, replications map[model.TableID]*ReplicationSet,
) *scheduleTask {
tables := make([]removeTable, 0, len(rmTables))
for _, tableID := range rmTables {
rep := replications[tableID]
var captureID model.CaptureID
if rep.Primary != "" {
captureID = rep.Primary
} else if rep.Secondary != "" {
captureID = rep.Secondary
} else {
log.Warn("tpscheduler: primary or secondary not found for removed table",
zap.Any("table", rep))
continue
}
tables = append(tables, removeTable{
TableID: tableID,
CaptureID: captureID,
})
}
return &scheduleTask{burstBalance: &burstBalance{
RemoveTables: tables,
}}
// We does not need accept callback here.
accept := (callback)(nil)
return newBurstBalanceMoveTables(accept, random, captures, replications)
}
Loading

0 comments on commit 5ce736d

Please sign in to comment.