Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

latency(cdc): 2 phase scheduling processor bug fix #5728

Merged
merged 14 commits into from
Jun 6, 2022
29 changes: 16 additions & 13 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ const (

type sinkNode struct {
sink sink.Sink
status TableState
state *TableState
tableID model.TableID

// atomic oprations for model.ResolvedTs
// atomic operations for model.ResolvedTs
resolvedTs atomic.Value
checkpointTs model.Ts
targetTs model.Ts
Expand All @@ -49,11 +49,12 @@ type sinkNode struct {
replicaConfig *config.ReplicaConfig
}

func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode {
func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts,
targetTs model.Ts, flowController tableFlowController, state *TableState) *sinkNode {
sn := &sinkNode{
tableID: tableID,
sink: sink,
status: TableStatePrepared,
state: state,
targetTs: targetTs,
checkpointTs: startTs,
barrierTs: startTs,
Expand All @@ -67,7 +68,7 @@ func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, target
func (n *sinkNode) ResolvedTs() model.ResolvedTs { return n.resolvedTs.Load().(model.ResolvedTs) }
func (n *sinkNode) CheckpointTs() model.Ts { return atomic.LoadUint64(&n.checkpointTs) }
func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) }
func (n *sinkNode) Status() TableState { return n.status.Load() }
func (n *sinkNode) State() TableState { return n.state.Load() }

func (n *sinkNode) initWithReplicaConfig(replicaConfig *config.ReplicaConfig) {
n.replicaConfig = replicaConfig
Expand All @@ -77,8 +78,8 @@ func (n *sinkNode) initWithReplicaConfig(replicaConfig *config.ReplicaConfig) {
// In this method, the builtin table sink will be closed by calling `Close`, and
// no more events can be sent to this sink node afterwards.
func (n *sinkNode) stop(ctx context.Context) (err error) {
// table stopped status must be set after underlying sink is closed
defer n.status.Store(TableStateStopped)
// table stopped state must be set after underlying sink is closed
defer n.state.Store(TableStateStopped)
err = n.sink.Close(ctx)
if err != nil {
return
Expand All @@ -93,7 +94,7 @@ func (n *sinkNode) stop(ctx context.Context) (err error) {
func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (err error) {
defer func() {
if err != nil {
n.status.Store(TableStateStopped)
n.state.Store(TableStateStopped)
return
}
if atomic.LoadUint64(&n.checkpointTs) >= n.targetTs {
Expand Down Expand Up @@ -240,16 +241,18 @@ func splitUpdateEvent(updateEvent *model.PolymorphicEvent) (*model.PolymorphicEv
}

func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (bool, error) {
if n.status.Load() == TableStateStopped {
if n.state.Load() == TableStateStopped {
return false, cerror.ErrTableProcessorStoppedSafely.GenWithStackByArgs()
}
switch msg.Tp {
case pmessage.MessageTypePolymorphicEvent:
event := msg.PolymorphicEvent
if n.state.Load() != TableStateReplicating {
log.Panic("table not in replicating state",
zap.Int64("tableID", n.tableID),
zap.Any("state", n.state.Load()))
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
}
if event.IsResolved() {
if n.status.Load() == TableStatePrepared {
n.status.Store(TableStateReplicating)
}
failpoint.Inject("ProcessorSyncResolvedError", func() {
failpoint.Return(false, errors.New("processor sync resolved injected error"))
})
Expand Down Expand Up @@ -291,7 +294,7 @@ func (n *sinkNode) updateBarrierTs(ctx context.Context, ts model.Ts) error {
}

func (n *sinkNode) releaseResource(ctx context.Context) error {
n.status.Store(TableStateStopped)
n.state.Store(TableStateStopped)
n.flowController.Abort()
return n.sink.Close(ctx)
}
84 changes: 46 additions & 38 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,23 +127,24 @@ func (s *mockCloseControlSink) Close(ctx context.Context) error {
func TestStatus(t *testing.T) {
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test-status"),
ID: model.DefaultChangeFeedID("changefeed-id-test-state"),
Info: &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Config: config.GetDefaultReplicaConfig(),
},
})

state := TableStatePreparing
// test stop at targetTs
node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, &state)
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).
ChangefeedVars().Info.Config)
require.Equal(t, TableStatePrepared, node.Status())
require.Equal(t, TableStatePrepared, node.State())

ok, err := node.HandleMessage(ctx, pmessage.BarrierMessage(20))
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStatePrepared, node.Status())
require.Equal(t, TableStatePrepared, node.State())
require.Equal(t, model.Ts(20), node.BarrierTs())

msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
Expand All @@ -153,7 +154,7 @@ func TestStatus(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStatePrepared, node.Status())
require.Equal(t, TableStatePrepared, node.State())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypePut},
Expand All @@ -162,7 +163,7 @@ func TestStatus(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStatePrepared, node.Status())
require.Equal(t, TableStatePrepared, node.State())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Expand All @@ -171,7 +172,7 @@ func TestStatus(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStateReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.State())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 15, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Expand All @@ -180,20 +181,20 @@ func TestStatus(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.False(t, ok)
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
require.Equal(t, TableStateStopped, node.Status())
require.Equal(t, TableStateStopped, node.State())
require.Equal(t, model.Ts(10), node.CheckpointTs())

// test the stop at ts command
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, &state)
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatePrepared, node.Status())
require.Equal(t, TableStatePrepared, node.State())

msg = pmessage.BarrierMessage(20)
ok, err = node.HandleMessage(ctx, msg)
require.True(t, ok)
require.Nil(t, err)
require.Equal(t, TableStatePrepared, node.Status())
require.Equal(t, TableStatePrepared, node.State())
require.Equal(t, model.Ts(20), node.BarrierTs())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
Expand All @@ -203,13 +204,13 @@ func TestStatus(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStateReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.State())

msg = pmessage.CommandMessage(&pmessage.Command{Tp: pmessage.CommandTypeStop})
ok, err = node.HandleMessage(ctx, msg)
require.False(t, ok)
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
require.Equal(t, TableStateStopped, node.Status())
require.Equal(t, TableStateStopped, node.State())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Expand All @@ -218,20 +219,20 @@ func TestStatus(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.False(t, ok)
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
require.Equal(t, TableStateStopped, node.Status())
require.Equal(t, TableStateStopped, node.State())
require.Equal(t, uint64(2), node.CheckpointTs())

// test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, &state)
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatePrepared, node.Status())
require.Equal(t, TableStatePrepared, node.State())

msg = pmessage.BarrierMessage(20)
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStatePrepared, node.Status())
require.Equal(t, TableStatePrepared, node.State())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Expand All @@ -240,13 +241,13 @@ func TestStatus(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStateReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.State())

msg = pmessage.CommandMessage(&pmessage.Command{Tp: pmessage.CommandTypeStop})
ok, err = node.HandleMessage(ctx, msg)
require.False(t, ok)
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
require.Equal(t, TableStateStopped, node.Status())
require.Equal(t, TableStateStopped, node.State())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Expand All @@ -255,27 +256,29 @@ func TestStatus(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.False(t, ok)
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
require.Equal(t, TableStateStopped, node.Status())
require.Equal(t, TableStateStopped, node.State())
require.Equal(t, uint64(7), node.CheckpointTs())
}

// TestStopStatus tests the table status of a pipeline is not set to stopped
// TestStopStatus tests the table state of a pipeline is not set to stopped
// until the underlying sink is closed
func TestStopStatus(t *testing.T) {
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test-status"),
ID: model.DefaultChangeFeedID("changefeed-id-test-state"),
Info: &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Config: config.GetDefaultReplicaConfig(),
},
})

state := TableStatePreparing
closeCh := make(chan interface{}, 1)
node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{})
node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh},
0, 100, &mockFlowController{}, &state)
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatePrepared, node.Status())
require.Equal(t, TableStatePrepared, node.State())

msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Expand All @@ -284,7 +287,7 @@ func TestStopStatus(t *testing.T) {
ok, err := node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStateReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.State())

var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -295,11 +298,11 @@ func TestStopStatus(t *testing.T) {
ok, err := node.HandleMessage(ctx, msg)
require.False(t, ok)
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
require.Equal(t, TableStateStopped, node.Status())
require.Equal(t, TableStateStopped, node.State())
}()
// wait to ensure stop message is sent to the sink node
time.Sleep(time.Millisecond * 50)
require.Equal(t, TableStateReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.State())
closeCh <- struct{}{}
wg.Wait()
}
Expand All @@ -313,11 +316,12 @@ func TestManyTs(t *testing.T) {
Config: config.GetDefaultReplicaConfig(),
},
})
state := TableStatePreparing
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, &state)
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatePrepared, node.Status())
require.Equal(t, TableStatePrepared, node.State())

msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 1, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, Row: &model.RowChangedEvent{
Expand All @@ -336,7 +340,7 @@ func TestManyTs(t *testing.T) {
},
},
})
require.Equal(t, TableStatePrepared, node.Status())
require.Equal(t, TableStatePrepared, node.State())
ok, err := node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
Expand All @@ -358,7 +362,7 @@ func TestManyTs(t *testing.T) {
},
},
})
require.Equal(t, TableStatePrepared, node.Status())
require.Equal(t, TableStatePrepared, node.State())
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
Expand All @@ -370,7 +374,7 @@ func TestManyTs(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStateReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.State())
sink.Check(t, []struct {
resolvedTs model.Ts
row *model.RowChangedEvent
Expand Down Expand Up @@ -415,7 +419,7 @@ func TestManyTs(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStateReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.State())

sink.Check(t, []struct {
resolvedTs model.Ts
Expand Down Expand Up @@ -465,7 +469,7 @@ func TestManyTs(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStateReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.State())
sink.Check(t, []struct {
resolvedTs model.Ts
row *model.RowChangedEvent
Expand All @@ -486,8 +490,9 @@ func TestIgnoreEmptyRowChangeEvent(t *testing.T) {
Config: config.GetDefaultReplicaConfig(),
},
})
state := TableStatePreparing
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, &state)
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)

Expand All @@ -511,8 +516,9 @@ func TestSplitUpdateEventWhenEnableOldValue(t *testing.T) {
Config: config.GetDefaultReplicaConfig(),
},
})
state := TableStatePreparing
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, &state)
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)

Expand Down Expand Up @@ -573,8 +579,9 @@ func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) {
Config: cfg,
},
})
state := TableStatePreparing
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, &state)
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)

Expand Down Expand Up @@ -721,10 +728,11 @@ func TestFlushSinkReleaseFlowController(t *testing.T) {
Config: cfg,
},
})
state := TableStatePreparing
flowController := &flushFlowController{}
sink := &flushSink{}
// sNode is a sinkNode
sNode := newSinkNode(1, sink, 0, 10, flowController)
sNode := newSinkNode(1, sink, 0, 10, flowController, &state)
sNode.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
sNode.barrierTs = 10
Expand Down
Loading