Skip to content

Commit

Permalink
cdc: integrate two phase scheduler (#5710)
Browse files Browse the repository at this point in the history
* cdc: integrate two phase scheduler

Signed-off-by: Neil Shen <overvenus@gmail.com>

* tp: fix duplicate topic handlers panic

Signed-off-by: Neil Shen <overvenus@gmail.com>

* tp: add From to messages from scheduler

Signed-off-by: Neil Shen <overvenus@gmail.com>

* tp: add more comments and fix typo

Signed-off-by: Neil Shen <overvenus@gmail.com>

* cdc: fix missing table ID in table meta

Signed-off-by: Neil Shen <overvenus@gmail.com>

* cdc: fix processor.tick panic

Signed-off-by: Neil Shen <overvenus@gmail.com>

* tp: fix missing processor epoch

Signed-off-by: Neil Shen <overvenus@gmail.com>

* tp: fix missing checkpoint ts in burstBalanceTask

Signed-off-by: Neil Shen <overvenus@gmail.com>

* tp: fix lost AddTable request during Commit

Signed-off-by: Neil Shen <overvenus@gmail.com>

* Revert "cdc: fix processor.tick panic"

This reverts commit ebb0f2c.
  • Loading branch information
overvenus authored Jun 2, 2022
1 parent 0b1c089 commit 39dc96f
Show file tree
Hide file tree
Showing 17 changed files with 288 additions and 86 deletions.
21 changes: 14 additions & 7 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/cdc/scheduler"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/orchestrator"
Expand All @@ -43,17 +44,23 @@ import (
// This function is factored out to facilitate unit testing.
func newSchedulerV2FromCtx(
ctx cdcContext.Context, startTs uint64,
) (scheduler.Scheduler, error) {
) (ret scheduler.Scheduler, err error) {
changeFeedID := ctx.ChangefeedVars().ID
messageServer := ctx.GlobalVars().MessageServer
messageRouter := ctx.GlobalVars().MessageRouter
ownerRev := ctx.GlobalVars().OwnerRevision
ret, err := scheduler.NewScheduler(
ctx, changeFeedID, startTs, messageServer, messageRouter, ownerRev)
if err != nil {
return nil, errors.Trace(err)
}
return ret, nil
captureID := ctx.GlobalVars().CaptureInfo.ID
cfg := config.GetGlobalServerConfig().Debug
if cfg.EnableTwoPhaseScheduler {
ret, err = scheduler.NewTpScheduler(
ctx, captureID, changeFeedID, startTs,
messageServer, messageRouter, ownerRev, cfg.Scheduler)
} else {
ret, err = scheduler.NewScheduler(
ctx, captureID, changeFeedID, startTs,
messageServer, messageRouter, ownerRev, cfg.Scheduler)
}
return ret, errors.Trace(err)
}

func newScheduler(ctx cdcContext.Context, startTs uint64) (scheduler.Scheduler, error) {
Expand Down
17 changes: 12 additions & 5 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,12 +373,14 @@ func (p *processor) GetTableMeta(tableID model.TableID) pipeline.TableMeta {
table, ok := p.tables[tableID]
if !ok {
return pipeline.TableMeta{
TableID: tableID,
CheckpointTs: 0,
ResolvedTs: 0,
Status: pipeline.TableStateAbsent,
}
}
return pipeline.TableMeta{
TableID: tableID,
CheckpointTs: table.CheckpointTs(),
ResolvedTs: table.ResolvedTs(),
Status: table.Status(),
Expand Down Expand Up @@ -670,15 +672,20 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
return nil
}

func (p *processor) newAgentImpl(ctx cdcContext.Context) (scheduler.Agent, error) {
func (p *processor) newAgentImpl(ctx cdcContext.Context) (ret scheduler.Agent, err error) {
messageServer := ctx.GlobalVars().MessageServer
messageRouter := ctx.GlobalVars().MessageRouter
etcdClient := ctx.GlobalVars().EtcdClient
ret, err := scheduler.NewAgent(ctx, messageServer, messageRouter, etcdClient, p, p.changefeedID)
if err != nil {
return nil, errors.Trace(err)
captureID := ctx.GlobalVars().CaptureInfo.ID
cfg := config.GetGlobalServerConfig().Debug
if cfg.EnableTwoPhaseScheduler {
ret, err = scheduler.NewTpAgent(
ctx, captureID, messageServer, messageRouter, etcdClient, p, p.changefeedID)
} else {
ret, err = scheduler.NewAgent(
ctx, captureID, messageServer, messageRouter, etcdClient, p, p.changefeedID)
}
return ret, nil
return ret, errors.Trace(err)
}

// handleErrorCh listen the error channel and throw the error if it is not expected.
Expand Down
3 changes: 3 additions & 0 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,3 +679,6 @@ func TestUpdateBarrierTs(t *testing.T) {
tb = p.tables[model.TableID(1)].(*mockTablePipeline)
require.Equal(t, tb.barrierTs, uint64(15))
}

func TestGetTableMeta(t *testing.T) {
}
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/tp/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewAgent(ctx context.Context,
tableExec: tableExecutor,
runningTasks: make(map[model.TableID]*dispatchTableTask),
}
trans, err := newTransport(ctx, changeFeedID, messageServer, messageRouter)
trans, err := newTransport(ctx, changeFeedID, agentRole, messageServer, messageRouter)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
29 changes: 16 additions & 13 deletions cdc/scheduler/internal/tp/capture_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@ import (

// CaptureState is the state of a capture.
//
// ┌──────────────┐ Heartbeat Resp ┌─────────────┐
// │ Uninitialize ├───────────────>│ Initialized │
// └──────┬───────┘ └──────┬──────┘
// │ │
// IsStopping │ ┌──────────┐ │ IsStopping
// └────────> │ Stopping │ <───────┘
// ┌──────────────┐ Heartbeat Resp ┌─────────────┐
// │ Uninitialized ├───────────────>│ Initialized │
// └──────┬───────┘ └──────┬──────┘
// │
// IsStopping │ ┌──────────┐ │ IsStopping
// └────────> │ Stopping │ <───────
// └──────────┘
type CaptureState int

const (
// CaptureStateUninitialize means the capture status is unknown,
// CaptureStateUninitialized means the capture status is unknown,
// no heartbeat response received yet.
CaptureStateUninitialize CaptureState = 1
CaptureStateUninitialized CaptureState = 1
// CaptureStateInitialized means owner has received heartbeat response.
CaptureStateInitialized CaptureState = 2
// CaptureStateStopping means the capture is removing, e.g., shutdown.
Expand All @@ -50,22 +50,22 @@ type CaptureStatus struct {
}

func newCaptureStatus(rev schedulepb.OwnerRevision) *CaptureStatus {
return &CaptureStatus{OwnerRev: rev, State: CaptureStateUninitialize}
return &CaptureStatus{OwnerRev: rev, State: CaptureStateUninitialized}
}

func (c *CaptureStatus) handleHeartbeatResponse(
resp *schedulepb.HeartbeatResponse, epoch schedulepb.ProcessorEpoch,
) {
// Check epoch for initialized captures.
if c.State != CaptureStateUninitialize && c.Epoch.Epoch != epoch.Epoch {
if c.State != CaptureStateUninitialized && c.Epoch.Epoch != epoch.Epoch {
log.Warn("tpscheduler: ignore heartbeat response",
zap.String("epoch", c.Epoch.Epoch),
zap.String("respEpoch", epoch.Epoch),
zap.Int64("ownerRev", c.OwnerRev.Revision))
return
}

if c.State == CaptureStateUninitialize {
if c.State == CaptureStateUninitialized {
c.Epoch = epoch
c.State = CaptureStateInitialized
}
Expand Down Expand Up @@ -112,7 +112,10 @@ func (c *captureManager) CheckAllCaptureInitialized() bool {

func (c *captureManager) checkAllCaptureInitialized() bool {
for _, captureStatus := range c.Captures {
if captureStatus.State == CaptureStateUninitialize {
// CaptureStateStopping is also considered initialized, because when
// a capture shutdown, it becomes stopping, we need to move its tables
// to other captures.
if captureStatus.State == CaptureStateUninitialized {
return false
}
}
Expand Down Expand Up @@ -190,7 +193,7 @@ func (c *captureManager) HandleAliveCaptureUpdate(
log.Info("tpscheduler: removed a capture", zap.String("capture", id))
delete(c.Captures, id)

// Only update changes after initializtion.
// Only update changes after initialization.
if !c.initialized {
continue
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/tp/capture_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestCaptureStatusHandleHeartbeatResponse(t *testing.T) {
rev := schedulepb.OwnerRevision{Revision: 1}
epoch := schedulepb.ProcessorEpoch{Epoch: "test"}
c := newCaptureStatus(rev)
require.Equal(t, CaptureStateUninitialize, c.State)
require.Equal(t, CaptureStateUninitialized, c.State)

// Uninitialize -> Initialized
c.handleHeartbeatResponse(&schedulepb.HeartbeatResponse{}, epoch)
Expand Down
25 changes: 18 additions & 7 deletions cdc/scheduler/internal/tp/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var _ internal.Scheduler = (*coordinator)(nil)
type coordinator struct {
version string
revision schedulepb.OwnerRevision
captureID model.CaptureID
trans transport
scheduler []scheduler
replicationM *replicationManager
Expand All @@ -53,21 +54,23 @@ type coordinator struct {
// NewCoordinator returns a two phase scheduler.
func NewCoordinator(
ctx context.Context,
captureID model.CaptureID,
changeFeedID model.ChangeFeedID,
checkpointTs model.Ts,
messageServer *p2p.MessageServer,
messageRouter p2p.MessageRouter,
ownerRevision int64,
cfg *config.SchedulerConfig,
) (internal.Scheduler, error) {
trans, err := newTransport(ctx, changeFeedID, messageServer, messageRouter)
trans, err := newTransport(ctx, changeFeedID, schedulerRole, messageServer, messageRouter)
if err != nil {
return nil, errors.Trace(err)
}
revision := schedulepb.OwnerRevision{Revision: ownerRevision}
return &coordinator{
version: version.ReleaseSemver(),
revision: revision,
captureID: captureID,
trans: trans,
scheduler: []scheduler{newBalancer()},
replicationM: newReplicationManager(cfg.MaxTaskConcurrency),
Expand Down Expand Up @@ -171,8 +174,8 @@ func (c *coordinator) recvMsgs(ctx context.Context) ([]*schedulepb.Message, erro

n := 0
for _, val := range recvMsgs {
// Filter stale messages.
if val.Header.OwnerRevision == c.revision {
// Filter stale messages and lost messages.
if val.Header.OwnerRevision == c.revision && val.To == c.captureID {
recvMsgs[n] = val
n++
}
Expand All @@ -183,15 +186,23 @@ func (c *coordinator) recvMsgs(ctx context.Context) ([]*schedulepb.Message, erro
func (c *coordinator) sendMsgs(ctx context.Context, msgs []*schedulepb.Message) error {
for i := range msgs {
m := msgs[i]
m.Header = &schedulepb.Message_Header{
Version: c.version,
OwnerRevision: c.revision,
}
// Correctness check.
if len(m.To) == 0 || m.MsgType == schedulepb.MsgUnknown {
log.Panic("invalid message no destination or unknown message type",
zap.Any("message", m))
}

epoch := schedulepb.ProcessorEpoch{}
if capture := c.captureM.Captures[m.To]; capture != nil {
epoch = capture.Epoch
}
m.Header = &schedulepb.Message_Header{
Version: c.version,
OwnerRevision: c.revision,
ProcessorEpoch: epoch,
}
m.From = c.captureID

}
return c.trans.Send(ctx, msgs)
}
49 changes: 35 additions & 14 deletions cdc/scheduler/internal/tp/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,33 @@ func TestCoordinatorSendMsgs(t *testing.T) {
t.Parallel()
ctx := context.Background()
trans := newMockTrans()
cood := coordinator{
version: "6.2.0",
revision: schedulepb.OwnerRevision{Revision: 3},
trans: trans,
coord := coordinator{
version: "6.2.0",
revision: schedulepb.OwnerRevision{Revision: 3},
captureID: "0",
trans: trans,
}
cood.sendMsgs(
coord.captureM = newCaptureManager(coord.revision, 0)
coord.sendMsgs(
ctx, []*schedulepb.Message{{To: "1", MsgType: schedulepb.MsgDispatchTableRequest}})

coord.captureM.Captures["1"] = &CaptureStatus{Epoch: schedulepb.ProcessorEpoch{Epoch: "epoch"}}
coord.sendMsgs(
ctx, []*schedulepb.Message{{To: "1", MsgType: schedulepb.MsgDispatchTableRequest}})

require.EqualValues(t, []*schedulepb.Message{{
Header: &schedulepb.Message_Header{
Version: cood.version,
OwnerRevision: cood.revision,
Version: coord.version,
OwnerRevision: coord.revision,
},
From: "0", To: "1", MsgType: schedulepb.MsgDispatchTableRequest,
}, {
Header: &schedulepb.Message_Header{
Version: coord.version,
OwnerRevision: coord.revision,
ProcessorEpoch: schedulepb.ProcessorEpoch{Epoch: "epoch"},
},
To: "1", MsgType: schedulepb.MsgDispatchTableRequest,
From: "0", To: "1", MsgType: schedulepb.MsgDispatchTableRequest,
}}, trans.sendBuffer)
}

Expand All @@ -90,24 +103,32 @@ func TestCoordinatorRecvMsgs(t *testing.T) {
ctx := context.Background()
trans := &mockTrans{}
coord := coordinator{
version: "6.2.0",
revision: schedulepb.OwnerRevision{Revision: 3},
trans: trans,
version: "6.2.0",
revision: schedulepb.OwnerRevision{Revision: 3},
captureID: "0",
trans: trans,
}

trans.recvBuffer = append(trans.recvBuffer,
&schedulepb.Message{
Header: &schedulepb.Message_Header{
OwnerRevision: coord.revision,
},
From: "1", MsgType: schedulepb.MsgDispatchTableResponse,
From: "1", To: coord.captureID, MsgType: schedulepb.MsgDispatchTableResponse,
})
trans.recvBuffer = append(trans.recvBuffer,
&schedulepb.Message{
Header: &schedulepb.Message_Header{
OwnerRevision: schedulepb.OwnerRevision{Revision: 4},
},
From: "2", MsgType: schedulepb.MsgDispatchTableResponse,
From: "2", To: coord.captureID, MsgType: schedulepb.MsgDispatchTableResponse,
})
trans.recvBuffer = append(trans.recvBuffer,
&schedulepb.Message{
Header: &schedulepb.Message_Header{
OwnerRevision: coord.revision,
},
From: "3", To: "lost", MsgType: schedulepb.MsgDispatchTableResponse,
})

msgs, err := coord.recvMsgs(ctx)
Expand All @@ -116,7 +137,7 @@ func TestCoordinatorRecvMsgs(t *testing.T) {
Header: &schedulepb.Message_Header{
OwnerRevision: coord.revision,
},
From: "1", MsgType: schedulepb.MsgDispatchTableResponse,
From: "1", To: "0", MsgType: schedulepb.MsgDispatchTableResponse,
}}, msgs)
}

Expand Down
5 changes: 3 additions & 2 deletions cdc/scheduler/internal/tp/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type callback func()
type burstBalance struct {
// Add tables to captures
AddTables, RemoveTables map[model.TableID]model.CaptureID
checkpointTs model.Ts
CheckpointTs model.Ts
}

type moveTable struct {
Expand Down Expand Up @@ -331,14 +331,15 @@ func (r *replicationManager) handleBurstBalanceTasks(
fields = append(fields, zap.Int("total", len(task.AddTables)+len(task.RemoveTables)))
log.Info("tpscheduler: handle burst balance task", fields...)

checkpointTs := task.CheckpointTs
sentMsgs := make([]*schedulepb.Message, 0, len(task.AddTables))
for tableID, captureID := range task.AddTables {
if r.runningTasks[tableID] != nil {
// Skip add table if the table is already running a task.
continue
}
msgs, err := r.handleAddTableTask(&addTable{
TableID: tableID, CaptureID: captureID,
TableID: tableID, CaptureID: captureID, CheckpointTs: checkpointTs,
})
if err != nil {
return nil, errors.Trace(err)
Expand Down
Loading

0 comments on commit 39dc96f

Please sign in to comment.