Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler(2pc): agent for 2 phase scheduling #5593

Merged
merged 24 commits into from
May 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cdc/processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *managerTester) resetSuit(ctx cdcContext.Context, t *testing.T) {
return &mockTablePipeline{
tableID: tableID,
name: fmt.Sprintf("`test`.`table%d`", tableID),
status: tablepipeline.TableStatusReplicating,
status: tablepipeline.TableStateReplicating,
resolvedTs: replicaInfo.StartTs,
checkpointTs: replicaInfo.StartTs,
}, nil
Expand Down
18 changes: 9 additions & 9 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (

type sinkNode struct {
sink sink.Sink
status TableStatus
status TableState
tableID model.TableID

// atomic oprations for model.ResolvedTs
Expand All @@ -55,7 +55,7 @@ func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, target
sn := &sinkNode{
tableID: tableID,
sink: sink,
status: TableStatusPrepared,
status: TableStatePrepared,
targetTs: targetTs,
checkpointTs: startTs,
barrierTs: startTs,
Expand All @@ -69,7 +69,7 @@ func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, target
func (n *sinkNode) ResolvedTs() model.ResolvedTs { return n.resolvedTs.Load().(model.ResolvedTs) }
func (n *sinkNode) CheckpointTs() model.Ts { return atomic.LoadUint64(&n.checkpointTs) }
func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) }
func (n *sinkNode) Status() TableStatus { return n.status.Load() }
func (n *sinkNode) Status() TableState { return n.status.Load() }

func (n *sinkNode) Init(ctx pipeline.NodeContext) error {
n.replicaConfig = ctx.ChangefeedVars().Info.Config
Expand All @@ -87,7 +87,7 @@ func (n *sinkNode) initWithReplicaConfig(isTableActorMode bool, replicaConfig *c
// no more events can be sent to this sink node afterwards.
func (n *sinkNode) stop(ctx context.Context) (err error) {
// table stopped status must be set after underlying sink is closed
defer n.status.Store(TableStatusStopped)
defer n.status.Store(TableStateStopped)
err = n.sink.Close(ctx)
if err != nil {
return
Expand All @@ -102,7 +102,7 @@ func (n *sinkNode) stop(ctx context.Context) (err error) {
func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (err error) {
defer func() {
if err != nil {
n.status.Store(TableStatusStopped)
n.status.Store(TableStateStopped)
return
}
if atomic.LoadUint64(&n.checkpointTs) >= n.targetTs {
Expand Down Expand Up @@ -255,15 +255,15 @@ func (n *sinkNode) Receive(ctx pipeline.NodeContext) error {
}

func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (bool, error) {
if n.status.Load() == TableStatusStopped {
if n.status.Load() == TableStateStopped {
return false, cerror.ErrTableProcessorStoppedSafely.GenWithStackByArgs()
}
switch msg.Tp {
case pmessage.MessageTypePolymorphicEvent:
event := msg.PolymorphicEvent
if event.IsResolved() {
if n.status.Load() == TableStatusPrepared {
n.status.Store(TableStatusReplicating)
if n.status.Load() == TableStatePrepared {
n.status.Store(TableStateReplicating)
}
failpoint.Inject("ProcessorSyncResolvedError", func() {
failpoint.Return(false, errors.New("processor sync resolved injected error"))
Expand Down Expand Up @@ -310,7 +310,7 @@ func (n *sinkNode) Destroy(ctx pipeline.NodeContext) error {
}

func (n *sinkNode) releaseResource(ctx context.Context) error {
n.status.Store(TableStatusStopped)
n.status.Store(TableStateStopped)
n.flowController.Abort()
return n.sink.Close(ctx)
}
52 changes: 26 additions & 26 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,12 @@ func TestStatus(t *testing.T) {
// test stop at targetTs
node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

err := node.Receive(
pipeline.MockNodeContext4Test(ctx, pmessage.BarrierMessage(20), nil))
require.NoError(t, err)
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())
require.Equal(t, model.Ts(20), node.BarrierTs())

msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
Expand All @@ -151,95 +151,95 @@ func TestStatus(t *testing.T) {
})
err = node.Receive(pipeline.MockNodeContext4Test(ctx, msg, nil))
require.NoError(t, err)
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypePut},
Row: &model.RowChangedEvent{},
})
err = node.Receive(pipeline.MockNodeContext4Test(ctx, msg, nil))
require.NoError(t, err)
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Row: &model.RowChangedEvent{},
})
err = node.Receive(pipeline.MockNodeContext4Test(ctx, msg, nil))
require.NoError(t, err)
require.Equal(t, TableStatusReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.Status())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 15, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Row: &model.RowChangedEvent{},
})
err = node.Receive(pipeline.MockNodeContext4Test(ctx, msg, nil))
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
require.Equal(t, TableStatusStopped, node.Status())
require.Equal(t, TableStateStopped, node.Status())
require.Equal(t, model.Ts(10), node.CheckpointTs())

// test the stop at ts command
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
err = node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))
require.NoError(t, err)
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

require.Nil(t, node.Receive(
pipeline.MockNodeContext4Test(ctx, pmessage.BarrierMessage(20), nil)))
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())
require.Equal(t, model.Ts(20), node.BarrierTs())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Row: &model.RowChangedEvent{},
})
require.Nil(t, node.Receive(pipeline.MockNodeContext4Test(ctx, msg, nil)))
require.Equal(t, TableStatusReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.Status())

err = node.Receive(pipeline.MockNodeContext4Test(ctx,
pmessage.CommandMessage(&pmessage.Command{Tp: pmessage.CommandTypeStop}), nil))
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
require.Equal(t, TableStatusStopped, node.Status())
require.Equal(t, TableStateStopped, node.Status())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Row: &model.RowChangedEvent{},
})
err = node.Receive(pipeline.MockNodeContext4Test(ctx, msg, nil))
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
require.Equal(t, TableStatusStopped, node.Status())
require.Equal(t, TableStateStopped, node.Status())
require.Equal(t, uint64(2), node.CheckpointTs())

// test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
err = node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))
require.Nil(t, err)
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

err = node.Receive(
pipeline.MockNodeContext4Test(ctx, pmessage.BarrierMessage(20), nil))
require.Nil(t, err)
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Row: &model.RowChangedEvent{},
})
require.Nil(t, node.Receive(pipeline.MockNodeContext4Test(ctx, msg, nil)))
require.Equal(t, TableStatusReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.Status())

err = node.Receive(pipeline.MockNodeContext4Test(ctx,
pmessage.CommandMessage(&pmessage.Command{Tp: pmessage.CommandTypeStop}), nil))
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
require.Equal(t, TableStatusStopped, node.Status())
require.Equal(t, TableStateStopped, node.Status())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Row: &model.RowChangedEvent{},
})
err = node.Receive(pipeline.MockNodeContext4Test(ctx, msg, nil))
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
require.Equal(t, TableStatusStopped, node.Status())
require.Equal(t, TableStateStopped, node.Status())
require.Equal(t, uint64(7), node.CheckpointTs())
}

Expand All @@ -258,14 +258,14 @@ func TestStopStatus(t *testing.T) {
closeCh := make(chan interface{}, 1)
node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{})
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Row: &model.RowChangedEvent{},
})
require.Nil(t, node.Receive(pipeline.MockNodeContext4Test(ctx, msg, nil)))
require.Equal(t, TableStatusReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.Status())

var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -275,11 +275,11 @@ func TestStopStatus(t *testing.T) {
err := node.Receive(pipeline.MockNodeContext4Test(ctx,
pmessage.CommandMessage(&pmessage.Command{Tp: pmessage.CommandTypeStop}), nil))
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
require.Equal(t, TableStatusStopped, node.Status())
require.Equal(t, TableStateStopped, node.Status())
}()
// wait to ensure stop message is sent to the sink node
time.Sleep(time.Millisecond * 50)
require.Equal(t, TableStatusReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.Status())
closeCh <- struct{}{}
wg.Wait()
}
Expand All @@ -296,7 +296,7 @@ func TestManyTs(t *testing.T) {
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

require.Nil(t, node.Receive(pipeline.MockNodeContext4Test(ctx,
pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
Expand All @@ -316,7 +316,7 @@ func TestManyTs(t *testing.T) {
},
},
}), nil)))
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

require.Nil(t, node.Receive(pipeline.MockNodeContext4Test(ctx,
pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
Expand All @@ -336,14 +336,14 @@ func TestManyTs(t *testing.T) {
},
},
}), nil)))
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Row: &model.RowChangedEvent{},
})
require.Nil(t, node.Receive(pipeline.MockNodeContext4Test(ctx, msg, nil)))
require.Equal(t, TableStatusReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.Status())
sink.Check(t, []struct {
resolvedTs model.Ts
row *model.RowChangedEvent
Expand Down Expand Up @@ -386,7 +386,7 @@ func TestManyTs(t *testing.T) {

require.Nil(t, node.Receive(
pipeline.MockNodeContext4Test(ctx, pmessage.BarrierMessage(1), nil)))
require.Equal(t, TableStatusReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.Status())

sink.Check(t, []struct {
resolvedTs model.Ts
Expand Down Expand Up @@ -434,7 +434,7 @@ func TestManyTs(t *testing.T) {

require.Nil(t, node.Receive(
pipeline.MockNodeContext4Test(ctx, pmessage.BarrierMessage(5), nil)))
require.Equal(t, TableStatusReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.Status())
sink.Check(t, []struct {
resolvedTs model.Ts
row *model.RowChangedEvent
Expand Down
12 changes: 6 additions & 6 deletions cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type sorterNode struct {
// The latest barrier ts that sorter has received.
barrierTs model.Ts

status TableStatus
status TableState
preparedCh chan struct{}

// started indicate that the sink is really replicating, not idle.
Expand All @@ -88,7 +88,7 @@ func newSorterNode(
mounter: mounter,
resolvedTs: startTs,
barrierTs: startTs,
status: TableStatusPreparing,
status: TableStatePreparing,
preparedCh: make(chan struct{}, 1),
startTsCh: make(chan model.Ts, 1),
replConfig: replConfig,
Expand Down Expand Up @@ -186,7 +186,7 @@ func (n *sorterNode) start(
case <-n.preparedCh:
}

n.status.Store(TableStatusReplicating)
n.status.Store(TableStateReplicating)
eventSorter.EmitStartTs(stdCtx, startTs)

for {
Expand Down Expand Up @@ -323,9 +323,9 @@ func (n *sorterNode) handleRawEvent(ctx context.Context, event *model.Polymorphi
}
// sorterNode is preparing, this is must the first `Resolved event` received
// the indicator that all regions connected.
if n.status.Load() == TableStatusPreparing {
if n.status.Load() == TableStatePreparing {
log.Info("sorterNode, first resolved event received", zap.Any("event", event))
n.status.Store(TableStatusPrepared)
n.status.Store(TableStatePrepared)
close(n.preparedCh)
}
}
Expand Down Expand Up @@ -377,4 +377,4 @@ func (n *sorterNode) BarrierTs() model.Ts {
return atomic.LoadUint64(&n.barrierTs)
}

func (n *sorterNode) Status() TableStatus { return n.status.Load() }
func (n *sorterNode) Status() TableState { return n.status.Load() }
Loading