Skip to content

Commit

Permalink
tp(cdc): support manual move table and rebalance (#5711)
Browse files Browse the repository at this point in the history
* add all changes.

* also check capture status, before rebalance tables.

* add basic move table scheduler implementation.

* add basic implementation of manual scheduling api.

* fix rebalance.

* add teste for manual move table .

* revert change in pb.go

* fix

* fix by review.
  • Loading branch information
3AceShowHand authored and ti-chi-bot committed Jun 24, 2022
1 parent d0ec96b commit 542b558
Show file tree
Hide file tree
Showing 16 changed files with 642 additions and 51 deletions.
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type TableMeta struct {
TableID model.TableID
CheckpointTs model.Ts
ResolvedTs model.Ts
Status TableState
State TableState
}

// TablePipeline is a pipeline which capture the change log from tikv in a table
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,14 +374,14 @@ func (p *processor) GetTableMeta(tableID model.TableID) pipeline.TableMeta {
TableID: tableID,
CheckpointTs: 0,
ResolvedTs: 0,
Status: pipeline.TableStateAbsent,
State: pipeline.TableStateAbsent,
}
}
return pipeline.TableMeta{
TableID: tableID,
CheckpointTs: table.CheckpointTs(),
ResolvedTs: table.ResolvedTs(),
Status: table.Status(),
State: table.Status(),
}
}

Expand Down
9 changes: 6 additions & 3 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) {
table1 := p.tables[1].(*mockTablePipeline)
require.Equal(t, model.Ts(20), table1.sinkStartTs)
require.Equal(t, pipeline.TableStateReplicating, table1.status)
meta := p.GetTableMeta(model.TableID(1))
require.Equal(t, model.TableID(1), meta.TableID)
require.Equal(t, pipeline.TableStateReplicating, meta.State)

ok, err = p.AddTable(ctx, 2, 20, false)
require.Nil(t, err)
Expand Down Expand Up @@ -380,6 +383,9 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) {
checkpointTs, done = p.IsRemoveTableFinished(ctx, 3)
require.True(t, done)
require.Equal(t, model.Ts(65), checkpointTs)
meta = p.GetTableMeta(model.TableID(3))
require.Equal(t, model.TableID(3), meta.TableID)
require.Equal(t, pipeline.TableStateAbsent, meta.State)

require.Len(t, p.tables, 3)
require.True(t, table3.canceled)
Expand Down Expand Up @@ -671,6 +677,3 @@ func TestUpdateBarrierTs(t *testing.T) {
tb = p.tables[model.TableID(1)].(*mockTablePipeline)
require.Equal(t, tb.barrierTs, uint64(15))
}

func TestGetTableMeta(t *testing.T) {
}
8 changes: 4 additions & 4 deletions cdc/scheduler/internal/tp/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func NewAgent(ctx context.Context,
zap.String("namespace", changeFeedID.Namespace),
zap.String("changefeed", changeFeedID.ID),
zap.Error(err))
return nil, nil
return result, nil
}
return nil, err
}
Expand Down Expand Up @@ -212,7 +212,7 @@ func (a *agent) tableStatus2PB(status pipeline.TableState) schedulepb.TableState

func (a *agent) newTableStatus(tableID model.TableID) schedulepb.TableStatus {
meta := a.tableExec.GetTableMeta(tableID)
state := a.tableStatus2PB(meta.Status)
state := a.tableStatus2PB(meta.State)

if task, ok := a.runningTasks[tableID]; ok {
// remove table task is not processed, or failed,
Expand Down Expand Up @@ -295,8 +295,8 @@ func (a *agent) handleMessageDispatchTableRequest(
zap.String("capture", a.captureID),
zap.String("namespace", a.changeFeedID.Namespace),
zap.String("changefeed", a.changeFeedID.ID),
zap.Any("epoch", epoch),
zap.Any("expected", a.epoch))
zap.String("epoch", epoch.Epoch),
zap.String("expected", a.epoch.Epoch))
return
}
task := new(dispatchTableTask)
Expand Down
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/tp/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,6 @@ func (e *MockTableExecutor) GetTableMeta(tableID model.TableID) pipeline.TableMe
TableID: tableID,
CheckpointTs: 0,
ResolvedTs: 0,
Status: state,
State: state,
}
}
18 changes: 11 additions & 7 deletions cdc/scheduler/internal/tp/capture_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ const (
CaptureStateStopping CaptureState = 3
)

var captureStateMap = map[CaptureState]string{
CaptureStateUninitialized: "CaptureStateUninitialized",
CaptureStateInitialized: "CaptureStateInitialized",
CaptureStateStopping: "CaptureStateStopping",
}

func (s CaptureState) String() string {
return captureStateMap[s]
}

// CaptureStatus represent capture's status.
type CaptureStatus struct {
OwnerRev schedulepb.OwnerRevision
Expand Down Expand Up @@ -101,13 +111,7 @@ func newCaptureManager(rev schedulepb.OwnerRevision, heartbeatTick int) *capture
}

func (c *captureManager) CheckAllCaptureInitialized() bool {
if !c.checkAllCaptureInitialized() {
return false
}
if !c.initialized {
return false
}
return true
return c.initialized && c.checkAllCaptureInitialized()
}

func (c *captureManager) checkAllCaptureInitialized() bool {
Expand Down
71 changes: 53 additions & 18 deletions cdc/scheduler/internal/tp/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,14 @@ import (

const checkpointCannotProceed = internal.CheckpointCannotProceed

type scheduler interface {
Name() string
Schedule(
checkpointTs model.Ts,
currentTables []model.TableID,
aliveCaptures map[model.CaptureID]*model.CaptureInfo,
replications map[model.TableID]*ReplicationSet,
) []*scheduleTask
}

var _ internal.Scheduler = (*coordinator)(nil)

type coordinator struct {
version string
revision schedulepb.OwnerRevision
captureID model.CaptureID
trans transport
scheduler []scheduler
schedulers map[schedulerType]scheduler
replicationM *replicationManager
captureM *captureManager
}
Expand All @@ -67,12 +57,18 @@ func NewCoordinator(
return nil, errors.Trace(err)
}
revision := schedulepb.OwnerRevision{Revision: ownerRevision}

schedulers := make(map[schedulerType]scheduler)
schedulers[schedulerTypeBurstBalance] = newBurstBalanceScheduler()
schedulers[schedulerTypeMoveTable] = newMoveTableScheduler()
schedulers[schedulerTypeRebalance] = newRebalanceScheduler()

return &coordinator{
version: version.ReleaseSemver(),
revision: revision,
captureID: captureID,
trans: trans,
scheduler: []scheduler{newBalancer()},
schedulers: schedulers,
replicationM: newReplicationManager(cfg.MaxTaskConcurrency),
captureM: newCaptureManager(revision, cfg.HeartbeatTick),
}, nil
Expand All @@ -90,11 +86,50 @@ func (c *coordinator) Tick(
return c.poll(ctx, checkpointTs, currentTables, aliveCaptures)
}

func (c *coordinator) MoveTable(tableID model.TableID, target model.CaptureID) {}
func (c *coordinator) MoveTable(tableID model.TableID, target model.CaptureID) {
if !c.captureM.CheckAllCaptureInitialized() {
log.Info("tpscheduler: manual move table task ignored, "+
"since not all captures initialized",
zap.Int64("tableID", tableID),
zap.String("targetCapture", target))
return
}
scheduler, ok := c.schedulers[schedulerTypeMoveTable]
if !ok {
log.Panic("tpscheduler: move table scheduler not found")
}
moveTableScheduler, ok := scheduler.(*moveTableScheduler)
if !ok {
log.Panic("tpscheduler: invalid move table scheduler found")
}
if !moveTableScheduler.addTask(tableID, target) {
log.Info("tpscheduler: manual move Table task ignored, "+
"since the last triggered task not finished",
zap.Int64("tableID", tableID),
zap.String("targetCapture", target))
}
}

func (c *coordinator) Rebalance() {}
func (c *coordinator) Rebalance() {
if !c.captureM.CheckAllCaptureInitialized() {
log.Info("tpscheduler: manual rebalance task ignored, " +
"since not all captures initialized")
return
}
scheduler, ok := c.schedulers[schedulerTypeRebalance]
if !ok {
log.Panic("tpscheduler: rebalance scheduler not found")
}
rebalanceScheduler, ok := scheduler.(*rebalanceScheduler)
if !ok {
log.Panic("tpscheduler: invalid rebalance scheduler found")
}
rebalanceScheduler.rebalance = true
}

func (c *coordinator) Close(ctx context.Context) {}
func (c *coordinator) Close(ctx context.Context) {
_ = c.trans.Close()
}

// ===========

Expand Down Expand Up @@ -138,12 +173,12 @@ func (c *coordinator) poll(
// Generate schedule tasks based on the current status.
replications := c.replicationM.ReplicationSets()
allTasks := make([]*scheduleTask, 0)
for _, sched := range c.scheduler {
tasks := sched.Schedule(checkpointTs, currentTables, aliveCaptures, replications)
for _, scheduler := range c.schedulers {
tasks := scheduler.Schedule(checkpointTs, currentTables, aliveCaptures, replications)
if len(tasks) != 0 {
log.Info("tpscheduler: new schedule task",
zap.Int("task", len(tasks)),
zap.String("scheduler", sched.Name()))
zap.String("scheduler", scheduler.Name()))
}
allTasks = append(allTasks, tasks...)
}
Expand Down
12 changes: 9 additions & 3 deletions cdc/scheduler/internal/tp/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,11 @@ func BenchmarkCoordinatorInit(b *testing.B) {
for i := 0; i < total; i++ {
currentTables = append(currentTables, int64(10000+i))
}
schedulers := make(map[schedulerType]scheduler)
schedulers[schedulerTypeBurstBalance] = newBurstBalanceScheduler()
coord = &coordinator{
trans: &mockTrans{},
scheduler: []scheduler{newBalancer()},
schedulers: schedulers,
replicationM: newReplicationManager(10),
// Disable heartbeat.
captureM: newCaptureManager(schedulepb.OwnerRevision{}, math.MaxInt),
Expand Down Expand Up @@ -228,9 +230,11 @@ func BenchmarkCoordinatorHeartbeat(b *testing.B) {
for i := 0; i < total; i++ {
currentTables = append(currentTables, int64(10000+i))
}
schedulers := make(map[schedulerType]scheduler)
schedulers[schedulerTypeBurstBalance] = newBurstBalanceScheduler()
coord = &coordinator{
trans: &mockTrans{},
scheduler: []scheduler{},
schedulers: schedulers,
replicationM: newReplicationManager(10),
captureM: captureM,
}
Expand Down Expand Up @@ -296,9 +300,11 @@ func BenchmarkCoordinatorHeartbeatResponse(b *testing.B) {
recvBuffer: recvMsgs,
keepRecvBuffer: true,
}
schedulers := make(map[schedulerType]scheduler)
schedulers[schedulerTypeBurstBalance] = newBurstBalanceScheduler()
coord = &coordinator{
trans: trans,
scheduler: []scheduler{},
schedulers: schedulers,
replicationM: replicationM,
captureM: captureM,
}
Expand Down
5 changes: 3 additions & 2 deletions cdc/scheduler/internal/tp/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (r *replicationManager) HandleCaptureChanges(
func (r *replicationManager) HandleMessage(
msgs []*schedulepb.Message,
) ([]*schedulepb.Message, error) {
sentMsgs := make([]*schedulepb.Message, 0)
sentMsgs := make([]*schedulepb.Message, 0, len(msgs))
for i := range msgs {
msg := msgs[i]
switch msg.MsgType {
Expand Down Expand Up @@ -230,7 +230,7 @@ func (r *replicationManager) HandleTasks(
}

// Check if accepting one more task exceeds maxTaskConcurrency.
if len(r.runningTasks)+1 > r.maxTaskConcurrency {
if len(r.runningTasks) == r.maxTaskConcurrency {
log.Debug("tpscheduler: too many running task")
// Does not use break, in case there is burst balance task
// in the remaining tasks.
Expand Down Expand Up @@ -366,6 +366,7 @@ func (r *replicationManager) handleBurstBalanceTasks(
return sentMsgs, nil
}

// ReplicationSets return all tracking replication set
// Caller must not modify the return replication sets.
func (r *replicationManager) ReplicationSets() map[model.TableID]*ReplicationSet {
return r.tables
Expand Down
36 changes: 36 additions & 0 deletions cdc/scheduler/internal/tp/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tp

import (
"github.com/pingcap/tiflow/cdc/model"
)

type scheduler interface {
Name() string
Schedule(
checkpointTs model.Ts,
currentTables []model.TableID,
aliveCaptures map[model.CaptureID]*model.CaptureInfo,
replications map[model.TableID]*ReplicationSet,
) []*scheduleTask
}

type schedulerType string

const (
schedulerTypeBurstBalance schedulerType = "burst-balance-scheduler"
schedulerTypeMoveTable schedulerType = "move-table-scheduler"
schedulerTypeRebalance schedulerType = "rebalance-scheduler"
)
15 changes: 7 additions & 8 deletions cdc/scheduler/internal/tp/scheduler_balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@ import (
"go.uber.org/zap"
)

var _ scheduler = &balancer{}
var _ scheduler = &burstBalanceScheduler{}

type balancer struct{}
type burstBalanceScheduler struct{}

func newBalancer() *balancer {
return &balancer{}
func newBurstBalanceScheduler() *burstBalanceScheduler {
return &burstBalanceScheduler{}
}

func (b *balancer) Name() string {
return "balancer"
func (b *burstBalanceScheduler) Name() string {
return string(schedulerTypeBurstBalance)
}

func (b *balancer) Schedule(
func (b *burstBalanceScheduler) Schedule(
checkpointTs model.Ts,
currentTables []model.TableID,
captures map[model.CaptureID]*model.CaptureInfo,
Expand Down Expand Up @@ -117,7 +117,6 @@ func newBurstBalanceRemoveTables(
} else {
log.Warn("tpscheduler: primary or secondary not found for removed table",
zap.Any("table", rep))
continue
}
}
return &scheduleTask{burstBalance: &burstBalance{
Expand Down
4 changes: 2 additions & 2 deletions cdc/scheduler/internal/tp/scheduler_balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestSchedulerBalance(t *testing.T) {

// AddTable only
replications := map[model.TableID]*ReplicationSet{}
b := newBalancer()
b := newBurstBalanceScheduler()
tasks := b.Schedule(0, currentTables, captures, replications)
require.Len(t, tasks, 1)
require.Contains(t, tasks[0].burstBalance.AddTables, model.TableID(1))
Expand Down Expand Up @@ -96,7 +96,7 @@ func benchmarkSchedulerBalance(
size := 16384
for total := 1; total <= size; total *= 2 {
name, currentTables, captures, replications := factory(total)
bal := newBalancer()
bal := newBurstBalanceScheduler()
b.ResetTimer()
b.Run(name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
Expand Down
Loading

0 comments on commit 542b558

Please sign in to comment.