Skip to content

Commit

Permalink
scheduler(2pc): agent for 2 phase scheduling (pingcap#5593)
Browse files Browse the repository at this point in the history
* fix some typo.

* update table scheduler proto

* add some new to agent.

* track owner info.

* try to handle dispatch table request.

* add more and more to agent implementation.

* fix update owner info.

* finish handle dispatch table.

* tackle epoch

* remove checkpoint from proto

* handle heartbeat with stopping.

* add benchmark for heartbeat response.

* fix agent.

* fix agent code layout.

* refine benchmark test.

* refine coordinator / capture_manager / relication_manager.

* fix agent.

* add a lot of test.

* revise the code.

* fix by suggestion.

* fix by suggestion.

* remoe pendingTask.

* fix unit test.
  • Loading branch information
3AceShowHand authored and overvenus committed Jun 21, 2022
1 parent 42601e4 commit 6e400de
Show file tree
Hide file tree
Showing 26 changed files with 1,406 additions and 452 deletions.
2 changes: 1 addition & 1 deletion cdc/processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *managerTester) resetSuit(ctx cdcContext.Context, t *testing.T) {
return &mockTablePipeline{
tableID: tableID,
name: fmt.Sprintf("`test`.`table%d`", tableID),
status: tablepipeline.TableStatusReplicating,
status: tablepipeline.TableStateReplicating,
resolvedTs: replicaInfo.StartTs,
checkpointTs: replicaInfo.StartTs,
}, nil
Expand Down
18 changes: 9 additions & 9 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (

type sinkNode struct {
sink sink.Sink
status TableStatus
status TableState
tableID model.TableID

// atomic oprations for model.ResolvedTs
Expand All @@ -54,7 +54,7 @@ func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, target
tableID: tableID,
sink: sink,
// sink is always at least prepared, for receiving data from upstream.
status: TableStatusPrepared,
status: TableStatePrepared,
targetTs: targetTs,
barrierTs: startTs,

Expand All @@ -68,7 +68,7 @@ func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, target
func (n *sinkNode) ResolvedTs() model.Ts { return n.getResolvedTs().ResolvedMark() }
func (n *sinkNode) CheckpointTs() model.Ts { return n.getCheckpointTs().ResolvedMark() }
func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) }
func (n *sinkNode) Status() TableStatus { return n.status.Load() }
func (n *sinkNode) Status() TableState { return n.status.Load() }

func (n *sinkNode) getResolvedTs() model.ResolvedTs {
return n.resolvedTs.Load().(model.ResolvedTs)
Expand All @@ -87,7 +87,7 @@ func (n *sinkNode) initWithReplicaConfig(replicaConfig *config.ReplicaConfig) {
// no more events can be sent to this sink node afterwards.
func (n *sinkNode) stop(ctx context.Context) (err error) {
// table stopped status must be set after underlying sink is closed
defer n.status.Store(TableStatusStopped)
defer n.status.Store(TableStateStopped)
err = n.sink.Close(ctx)
if err != nil {
return
Expand All @@ -102,7 +102,7 @@ func (n *sinkNode) stop(ctx context.Context) (err error) {
func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (err error) {
defer func() {
if err != nil {
n.status.Store(TableStatusStopped)
n.status.Store(TableStateStopped)
return
}
if n.CheckpointTs() >= n.targetTs {
Expand Down Expand Up @@ -249,15 +249,15 @@ func splitUpdateEvent(updateEvent *model.PolymorphicEvent) (*model.PolymorphicEv
}

func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (bool, error) {
if n.status.Load() == TableStatusStopped {
if n.status.Load() == TableStateStopped {
return false, cerror.ErrTableProcessorStoppedSafely.GenWithStackByArgs()
}
switch msg.Tp {
case pmessage.MessageTypePolymorphicEvent:
event := msg.PolymorphicEvent
if event.IsResolved() {
if n.status.Load() == TableStatusPrepared {
n.status.Store(TableStatusReplicating)
if n.status.Load() == TableStatePrepared {
n.status.Store(TableStateReplicating)
}
failpoint.Inject("ProcessorSyncResolvedError", func() {
failpoint.Return(false, errors.New("processor sync resolved injected error"))
Expand Down Expand Up @@ -306,7 +306,7 @@ func (n *sinkNode) updateBarrierTs(ctx context.Context, ts model.Ts) error {
}

func (n *sinkNode) releaseResource(ctx context.Context) error {
n.status.Store(TableStatusStopped)
n.status.Store(TableStateStopped)
n.flowController.Abort()
return n.sink.Close(ctx)
}
52 changes: 26 additions & 26 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,12 @@ func TestStatus(t *testing.T) {
node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).
ChangefeedVars().Info.Config)
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

ok, err := node.HandleMessage(ctx, pmessage.BarrierMessage(20))
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())
require.Equal(t, model.Ts(20), node.BarrierTs())

msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
Expand All @@ -153,7 +153,7 @@ func TestStatus(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypePut},
Expand All @@ -162,7 +162,7 @@ func TestStatus(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Expand All @@ -171,7 +171,7 @@ func TestStatus(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStatusReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.Status())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 15, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Expand All @@ -180,20 +180,20 @@ func TestStatus(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.False(t, ok)
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
require.Equal(t, TableStatusStopped, node.Status())
require.Equal(t, TableStateStopped, node.Status())
require.Equal(t, model.Ts(10), node.CheckpointTs())

// test the stop at ts command
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

msg = pmessage.BarrierMessage(20)
ok, err = node.HandleMessage(ctx, msg)
require.True(t, ok)
require.Nil(t, err)
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())
require.Equal(t, model.Ts(20), node.BarrierTs())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
Expand All @@ -203,13 +203,13 @@ func TestStatus(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStatusReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.Status())

msg = pmessage.CommandMessage(&pmessage.Command{Tp: pmessage.CommandTypeStop})
ok, err = node.HandleMessage(ctx, msg)
require.False(t, ok)
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
require.Equal(t, TableStatusStopped, node.Status())
require.Equal(t, TableStateStopped, node.Status())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Expand All @@ -218,20 +218,20 @@ func TestStatus(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.False(t, ok)
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
require.Equal(t, TableStatusStopped, node.Status())
require.Equal(t, TableStateStopped, node.Status())
require.Equal(t, uint64(2), node.CheckpointTs())

// test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

msg = pmessage.BarrierMessage(20)
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Expand All @@ -240,13 +240,13 @@ func TestStatus(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStatusReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.Status())

msg = pmessage.CommandMessage(&pmessage.Command{Tp: pmessage.CommandTypeStop})
ok, err = node.HandleMessage(ctx, msg)
require.False(t, ok)
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
require.Equal(t, TableStatusStopped, node.Status())
require.Equal(t, TableStateStopped, node.Status())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Expand All @@ -255,7 +255,7 @@ func TestStatus(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.False(t, ok)
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
require.Equal(t, TableStatusStopped, node.Status())
require.Equal(t, TableStateStopped, node.Status())
require.Equal(t, uint64(7), node.CheckpointTs())
}

Expand All @@ -275,7 +275,7 @@ func TestStopStatus(t *testing.T) {
node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{})
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Expand All @@ -284,7 +284,7 @@ func TestStopStatus(t *testing.T) {
ok, err := node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStatusReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.Status())

var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -295,11 +295,11 @@ func TestStopStatus(t *testing.T) {
ok, err := node.HandleMessage(ctx, msg)
require.False(t, ok)
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
require.Equal(t, TableStatusStopped, node.Status())
require.Equal(t, TableStateStopped, node.Status())
}()
// wait to ensure stop message is sent to the sink node
time.Sleep(time.Millisecond * 50)
require.Equal(t, TableStatusReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.Status())
closeCh <- struct{}{}
wg.Wait()
}
Expand All @@ -317,7 +317,7 @@ func TestManyTs(t *testing.T) {
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 1, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, Row: &model.RowChangedEvent{
Expand All @@ -336,7 +336,7 @@ func TestManyTs(t *testing.T) {
},
},
})
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())
ok, err := node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
Expand All @@ -358,7 +358,7 @@ func TestManyTs(t *testing.T) {
},
},
})
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
Expand All @@ -370,7 +370,7 @@ func TestManyTs(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStatusReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.Status())
sink.Check(t, []struct {
resolvedTs model.Ts
row *model.RowChangedEvent
Expand Down Expand Up @@ -415,7 +415,7 @@ func TestManyTs(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStatusReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.Status())

sink.Check(t, []struct {
resolvedTs model.Ts
Expand Down Expand Up @@ -465,7 +465,7 @@ func TestManyTs(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStatusReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.Status())
sink.Check(t, []struct {
resolvedTs model.Ts
row *model.RowChangedEvent
Expand Down
12 changes: 6 additions & 6 deletions cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type sorterNode struct {
// The latest barrier ts that sorter has received.
barrierTs model.Ts

status TableStatus
status TableState
preparedCh chan struct{}

// started indicate that the sink is really replicating, not idle.
Expand All @@ -81,7 +81,7 @@ func newSorterNode(
mounter: mounter,
resolvedTs: startTs,
barrierTs: startTs,
status: TableStatusPreparing,
status: TableStatePreparing,
preparedCh: make(chan struct{}, 1),
startTsCh: make(chan model.Ts, 1),
replConfig: replConfig,
Expand Down Expand Up @@ -182,7 +182,7 @@ func (n *sorterNode) start(
case <-n.preparedCh:
}

n.status.Store(TableStatusReplicating)
n.status.Store(TableStateReplicating)
eventSorter.EmitStartTs(stdCtx, startTs)

for {
Expand Down Expand Up @@ -308,9 +308,9 @@ func (n *sorterNode) handleRawEvent(ctx context.Context, event *model.Polymorphi
}
// sorterNode is preparing, this is must the first `Resolved event` received
// the indicator that all regions connected.
if n.status.Load() == TableStatusPreparing {
if n.status.Load() == TableStatePreparing {
log.Info("sorterNode, first resolved event received", zap.Any("event", event))
n.status.Store(TableStatusPrepared)
n.status.Store(TableStatePrepared)
close(n.preparedCh)
}
}
Expand Down Expand Up @@ -355,4 +355,4 @@ func (n *sorterNode) BarrierTs() model.Ts {
return atomic.LoadUint64(&n.barrierTs)
}

func (n *sorterNode) Status() TableStatus { return n.status.Load() }
func (n *sorterNode) Status() TableState { return n.status.Load() }
Loading

0 comments on commit 6e400de

Please sign in to comment.