From 30667729b42dfe07440ecaf36e655121569c8985 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Mon, 6 Jun 2022 21:56:31 +0800 Subject: [PATCH] latency(cdc): 2 phase scheduling processor bug fix (#5728) * fix by suggestion, use unique state for the table. * fix TestTableExecutorAddingTableDirectly * add ut TestTableExecutorAddingTableIndirectly * add ut TestTableExecutorAddingTableIndirectly --- cdc/processor/manager_test.go | 2 +- cdc/processor/pipeline/sink.go | 44 +++-- cdc/processor/pipeline/sink_test.go | 94 +++++---- cdc/processor/pipeline/sorter.go | 25 ++- cdc/processor/pipeline/sorter_test.go | 24 ++- cdc/processor/pipeline/table.go | 8 +- cdc/processor/pipeline/table_actor.go | 26 ++- cdc/processor/pipeline/table_actor_test.go | 143 ++++++++------ cdc/processor/processor.go | 41 ++-- cdc/processor/processor_test.go | 217 ++++++++++++++++++--- cdc/scheduler/internal/tp/agent.go | 54 +++-- cdc/scheduler/internal/tp/transport.go | 10 +- 12 files changed, 468 insertions(+), 220 deletions(-) diff --git a/cdc/processor/manager_test.go b/cdc/processor/manager_test.go index 63055a3d7e2..6df5e60f7dc 100644 --- a/cdc/processor/manager_test.go +++ b/cdc/processor/manager_test.go @@ -55,7 +55,7 @@ func (s *managerTester) resetSuit(ctx cdcContext.Context, t *testing.T) { return &mockTablePipeline{ tableID: tableID, name: fmt.Sprintf("`test`.`table%d`", tableID), - status: tablepipeline.TableStateReplicating, + state: tablepipeline.TableStateReplicating, resolvedTs: replicaInfo.StartTs, checkpointTs: replicaInfo.StartTs, }, nil diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index 36ee6280370..804118aa346 100755 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -32,15 +32,17 @@ import ( type sinkNode struct { sink sink.Sink - status TableState + state *TableState tableID model.TableID - // atomic oprations for model.ResolvedTs + // atomic operations for model.ResolvedTs resolvedTs atomic.Value checkpointTs atomic.Value targetTs model.Ts barrierTs model.Ts + changefeed model.ChangeFeedID + flowController tableFlowController redoManager redo.LogManager @@ -52,13 +54,16 @@ func newSinkNode( startTs model.Ts, targetTs model.Ts, flowController tableFlowController, redoManager redo.LogManager, + state *TableState, + changefeed model.ChangeFeedID, ) *sinkNode { sn := &sinkNode{ tableID: tableID, sink: sink, - status: TableStatePrepared, + state: state, targetTs: targetTs, barrierTs: startTs, + changefeed: changefeed, flowController: flowController, redoManager: redoManager, } @@ -70,7 +75,7 @@ func newSinkNode( func (n *sinkNode) ResolvedTs() model.Ts { return n.getResolvedTs().ResolvedMark() } func (n *sinkNode) CheckpointTs() model.Ts { return n.getCheckpointTs().ResolvedMark() } func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) } -func (n *sinkNode) Status() TableState { return n.status.Load() } +func (n *sinkNode) State() TableState { return n.state.Load() } func (n *sinkNode) getResolvedTs() model.ResolvedTs { return n.resolvedTs.Load().(model.ResolvedTs) @@ -88,13 +93,16 @@ func (n *sinkNode) initWithReplicaConfig(replicaConfig *config.ReplicaConfig) { // In this method, the builtin table sink will be closed by calling `Close`, and // no more events can be sent to this sink node afterwards. func (n *sinkNode) stop(ctx context.Context) (err error) { - // table stopped status must be set after underlying sink is closed - defer n.status.Store(TableStateStopped) + // table stopped state must be set after underlying sink is closed + defer n.state.Store(TableStateStopped) err = n.sink.Close(ctx) if err != nil { return } - log.Info("sink is closed", zap.Int64("tableID", n.tableID)) + log.Info("sink is closed", + zap.Int64("tableID", n.tableID), + zap.String("namespace", n.changefeed.Namespace), + zap.String("changefeed", n.changefeed.ID)) err = cerror.ErrTableProcessorStoppedSafely.GenWithStackByArgs() return } @@ -104,7 +112,7 @@ func (n *sinkNode) stop(ctx context.Context) (err error) { func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (err error) { defer func() { if err != nil { - n.status.Store(TableStateStopped) + n.state.Store(TableStateStopped) return } if n.CheckpointTs() >= n.targetTs { @@ -187,7 +195,11 @@ func (n *sinkNode) emitRowToSink(ctx context.Context, event *model.PolymorphicEv } if event == nil || event.Row == nil { - log.Warn("skip emit nil event", zap.Any("event", event)) + log.Warn("skip emit nil event", + zap.Int64("tableID", n.tableID), + zap.String("namespace", n.changefeed.Namespace), + zap.String("changefeed", n.changefeed.ID), + zap.Any("event", event)) return nil } @@ -197,7 +209,11 @@ func (n *sinkNode) emitRowToSink(ctx context.Context, event *model.PolymorphicEv // begin; insert into t (id) values (1); delete from t where id=1; commit; // Just ignore these row changed events. if colLen == 0 && preColLen == 0 { - log.Warn("skip emit empty row event", zap.Any("event", event)) + log.Warn("skip emit empty row event", + zap.Int64("tableID", n.tableID), + zap.String("namespace", n.changefeed.Namespace), + zap.String("changefeed", n.changefeed.ID), + zap.Any("event", event)) return nil } @@ -285,15 +301,15 @@ func splitUpdateEvent(updateEvent *model.PolymorphicEvent) (*model.PolymorphicEv } func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (bool, error) { - if n.status.Load() == TableStateStopped { + if n.state.Load() == TableStateStopped { return false, cerror.ErrTableProcessorStoppedSafely.GenWithStackByArgs() } switch msg.Tp { case pmessage.MessageTypePolymorphicEvent: event := msg.PolymorphicEvent if event.IsResolved() { - if n.status.Load() == TableStatePrepared { - n.status.Store(TableStateReplicating) + if n.state.Load() == TableStatePrepared { + n.state.Store(TableStateReplicating) } failpoint.Inject("ProcessorSyncResolvedError", func() { failpoint.Return(false, errors.New("processor sync resolved injected error")) @@ -340,7 +356,7 @@ func (n *sinkNode) updateBarrierTs(ctx context.Context, ts model.Ts) error { } func (n *sinkNode) releaseResource(ctx context.Context) error { - n.status.Store(TableStateStopped) + n.state.Store(TableStateStopped) n.flowController.Abort() return n.sink.Close(ctx) } diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index a142afa9759..cf27ed57f06 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -125,7 +125,7 @@ func (s *mockCloseControlSink) Close(ctx context.Context) error { } } -func TestStatus(t *testing.T) { +func TestState(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test-status"), @@ -135,16 +135,18 @@ func TestStatus(t *testing.T) { }, }) + state := TableStatePrepared // test stop at targetTs - node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager()) + node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), + &state, ctx.ChangefeedVars().ID) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil). ChangefeedVars().Info.Config) - require.Equal(t, TableStatePrepared, node.Status()) + require.Equal(t, TableStatePrepared, node.State()) ok, err := node.HandleMessage(ctx, pmessage.BarrierMessage(20)) require.Nil(t, err) require.True(t, ok) - require.Equal(t, TableStatePrepared, node.Status()) + require.Equal(t, TableStatePrepared, node.State()) require.Equal(t, model.Ts(20), node.BarrierTs()) msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ @@ -154,7 +156,7 @@ func TestStatus(t *testing.T) { ok, err = node.HandleMessage(ctx, msg) require.Nil(t, err) require.True(t, ok) - require.Equal(t, TableStatePrepared, node.Status()) + require.Equal(t, TableStatePrepared, node.State()) msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, @@ -163,7 +165,7 @@ func TestStatus(t *testing.T) { ok, err = node.HandleMessage(ctx, msg) require.Nil(t, err) require.True(t, ok) - require.Equal(t, TableStatePrepared, node.Status()) + require.Equal(t, TableStatePrepared, node.State()) msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, @@ -172,7 +174,7 @@ func TestStatus(t *testing.T) { ok, err = node.HandleMessage(ctx, msg) require.Nil(t, err) require.True(t, ok) - require.Equal(t, TableStateReplicating, node.Status()) + require.Equal(t, TableStateReplicating, node.State()) msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ CRTs: 15, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, @@ -181,20 +183,22 @@ func TestStatus(t *testing.T) { ok, err = node.HandleMessage(ctx, msg) require.False(t, ok) require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err)) - require.Equal(t, TableStateStopped, node.Status()) + require.Equal(t, TableStateStopped, node.State()) require.Equal(t, model.Ts(10), node.CheckpointTs()) // test the stop at ts command - node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager()) + state = TableStatePrepared + node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), + &state, ctx.ChangefeedVars().ID) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) - require.Equal(t, TableStatePrepared, node.Status()) + require.Equal(t, TableStatePrepared, node.State()) msg = pmessage.BarrierMessage(20) ok, err = node.HandleMessage(ctx, msg) require.True(t, ok) require.Nil(t, err) - require.Equal(t, TableStatePrepared, node.Status()) + require.Equal(t, TableStatePrepared, node.State()) require.Equal(t, model.Ts(20), node.BarrierTs()) msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ @@ -204,13 +208,13 @@ func TestStatus(t *testing.T) { ok, err = node.HandleMessage(ctx, msg) require.Nil(t, err) require.True(t, ok) - require.Equal(t, TableStateReplicating, node.Status()) + require.Equal(t, TableStateReplicating, node.State()) msg = pmessage.CommandMessage(&pmessage.Command{Tp: pmessage.CommandTypeStop}) ok, err = node.HandleMessage(ctx, msg) require.False(t, ok) require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err)) - require.Equal(t, TableStateStopped, node.Status()) + require.Equal(t, TableStateStopped, node.State()) msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, @@ -219,20 +223,22 @@ func TestStatus(t *testing.T) { ok, err = node.HandleMessage(ctx, msg) require.False(t, ok) require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err)) - require.Equal(t, TableStateStopped, node.Status()) + require.Equal(t, TableStateStopped, node.State()) require.Equal(t, uint64(2), node.CheckpointTs()) // test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts - node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager()) + state = TableStatePrepared + node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), + &state, ctx.ChangefeedVars().ID) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) - require.Equal(t, TableStatePrepared, node.Status()) + require.Equal(t, TableStatePrepared, node.State()) msg = pmessage.BarrierMessage(20) ok, err = node.HandleMessage(ctx, msg) require.Nil(t, err) require.True(t, ok) - require.Equal(t, TableStatePrepared, node.Status()) + require.Equal(t, TableStatePrepared, node.State()) msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, @@ -241,13 +247,13 @@ func TestStatus(t *testing.T) { ok, err = node.HandleMessage(ctx, msg) require.Nil(t, err) require.True(t, ok) - require.Equal(t, TableStateReplicating, node.Status()) + require.Equal(t, TableStateReplicating, node.State()) msg = pmessage.CommandMessage(&pmessage.Command{Tp: pmessage.CommandTypeStop}) ok, err = node.HandleMessage(ctx, msg) require.False(t, ok) require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err)) - require.Equal(t, TableStateStopped, node.Status()) + require.Equal(t, TableStateStopped, node.State()) msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, @@ -256,29 +262,31 @@ func TestStatus(t *testing.T) { ok, err = node.HandleMessage(ctx, msg) require.False(t, ok) require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err)) - require.Equal(t, TableStateStopped, node.Status()) + require.Equal(t, TableStateStopped, node.State()) require.Equal(t, uint64(7), node.CheckpointTs()) } -// TestStopStatus tests the table status of a pipeline is not set to stopped +// TestStopStatus tests the table state of a pipeline is not set to stopped // until the underlying sink is closed func TestStopStatus(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ - ID: model.DefaultChangeFeedID("changefeed-id-test-status"), + ID: model.DefaultChangeFeedID("changefeed-id-test-state"), Info: &model.ChangeFeedInfo{ StartTs: oracle.GoTimeToTS(time.Now()), Config: config.GetDefaultReplicaConfig(), }, }) + state := TableStatePrepared closeCh := make(chan interface{}, 1) node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, - &mockFlowController{}, redo.NewDisabledManager()) + &mockFlowController{}, redo.NewDisabledManager(), + &state, ctx.ChangefeedVars().ID) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) - require.Equal(t, TableStatePrepared, node.Status()) + require.Equal(t, TableStatePrepared, node.State()) msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, @@ -287,7 +295,7 @@ func TestStopStatus(t *testing.T) { ok, err := node.HandleMessage(ctx, msg) require.Nil(t, err) require.True(t, ok) - require.Equal(t, TableStateReplicating, node.Status()) + require.Equal(t, TableStateReplicating, node.State()) var wg sync.WaitGroup wg.Add(1) @@ -298,11 +306,11 @@ func TestStopStatus(t *testing.T) { ok, err := node.HandleMessage(ctx, msg) require.False(t, ok) require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err)) - require.Equal(t, TableStateStopped, node.Status()) + require.Equal(t, TableStateStopped, node.State()) }() // wait to ensure stop message is sent to the sink node time.Sleep(time.Millisecond * 50) - require.Equal(t, TableStateReplicating, node.Status()) + require.Equal(t, TableStateReplicating, node.State()) closeCh <- struct{}{} wg.Wait() } @@ -316,11 +324,13 @@ func TestManyTs(t *testing.T) { Config: config.GetDefaultReplicaConfig(), }, }) + state := TableStatePrepared sink := &mockSink{} - node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager()) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), + &state, ctx.ChangefeedVars().ID) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) - require.Equal(t, TableStatePrepared, node.Status()) + require.Equal(t, TableStatePrepared, node.State()) msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ CRTs: 1, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, Row: &model.RowChangedEvent{ @@ -339,7 +349,7 @@ func TestManyTs(t *testing.T) { }, }, }) - require.Equal(t, TableStatePrepared, node.Status()) + require.Equal(t, TableStatePrepared, node.State()) ok, err := node.HandleMessage(ctx, msg) require.Nil(t, err) require.True(t, ok) @@ -361,7 +371,7 @@ func TestManyTs(t *testing.T) { }, }, }) - require.Equal(t, TableStatePrepared, node.Status()) + require.Equal(t, TableStatePrepared, node.State()) ok, err = node.HandleMessage(ctx, msg) require.Nil(t, err) require.True(t, ok) @@ -373,7 +383,7 @@ func TestManyTs(t *testing.T) { ok, err = node.HandleMessage(ctx, msg) require.Nil(t, err) require.True(t, ok) - require.Equal(t, TableStateReplicating, node.Status()) + require.Equal(t, TableStateReplicating, node.State()) sink.Check(t, []struct { resolvedTs model.Ts row *model.RowChangedEvent @@ -418,7 +428,7 @@ func TestManyTs(t *testing.T) { ok, err = node.HandleMessage(ctx, msg) require.Nil(t, err) require.True(t, ok) - require.Equal(t, TableStateReplicating, node.Status()) + require.Equal(t, TableStateReplicating, node.State()) sink.Check(t, []struct { resolvedTs model.Ts @@ -468,7 +478,7 @@ func TestManyTs(t *testing.T) { ok, err = node.HandleMessage(ctx, msg) require.Nil(t, err) require.True(t, ok) - require.Equal(t, TableStateReplicating, node.Status()) + require.Equal(t, TableStateReplicating, node.State()) sink.Check(t, []struct { resolvedTs model.Ts row *model.RowChangedEvent @@ -489,8 +499,10 @@ func TestIgnoreEmptyRowChangeEvent(t *testing.T) { Config: config.GetDefaultReplicaConfig(), }, }) + state := TableStatePreparing sink := &mockSink{} - node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager()) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), + &state, ctx.ChangefeedVars().ID) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) @@ -514,8 +526,10 @@ func TestSplitUpdateEventWhenEnableOldValue(t *testing.T) { Config: config.GetDefaultReplicaConfig(), }, }) + state := TableStatePreparing sink := &mockSink{} - node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager()) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), + &state, ctx.ChangefeedVars().ID) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) @@ -576,8 +590,10 @@ func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) { Config: cfg, }, }) + state := TableStatePreparing sink := &mockSink{} - node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager()) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), + &state, ctx.ChangefeedVars().ID) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) @@ -724,10 +740,12 @@ func TestFlushSinkReleaseFlowController(t *testing.T) { Config: cfg, }, }) + state := TableStatePreparing flowController := &flushFlowController{} sink := &flushSink{} // sNode is a sinkNode - sNode := newSinkNode(1, sink, 0, 10, flowController, redo.NewDisabledManager()) + sNode := newSinkNode(1, sink, 0, 10, flowController, redo.NewDisabledManager(), + &state, ctx.ChangefeedVars().ID) sNode.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) sNode.barrierTs = 10 diff --git a/cdc/processor/pipeline/sorter.go b/cdc/processor/pipeline/sorter.go index dda9745145d..525e51a5499 100644 --- a/cdc/processor/pipeline/sorter.go +++ b/cdc/processor/pipeline/sorter.go @@ -53,12 +53,13 @@ type sorterNode struct { cancel context.CancelFunc // The latest resolved ts that sorter has received. + // once the resolvedTs advanced, the sorter is fully prepared. resolvedTs model.Ts // The latest barrier ts that sorter has received. barrierTs model.Ts - status TableState + state *TableState preparedCh chan struct{} // started indicate that the sink is really replicating, not idle. @@ -67,12 +68,14 @@ type sorterNode struct { startTsCh chan model.Ts replConfig *config.ReplicaConfig + + changefeed model.ChangeFeedID } func newSorterNode( tableName string, tableID model.TableID, startTs model.Ts, flowController tableFlowController, mounter entry.Mounter, - replConfig *config.ReplicaConfig, + replConfig *config.ReplicaConfig, state *TableState, changefeed model.ChangeFeedID, ) *sorterNode { return &sorterNode{ tableName: tableName, @@ -81,10 +84,12 @@ func newSorterNode( mounter: mounter, resolvedTs: startTs, barrierTs: startTs, - status: TableStatePreparing, + state: state, preparedCh: make(chan struct{}, 1), startTsCh: make(chan model.Ts, 1), replConfig: replConfig, + + changefeed: changefeed, } } @@ -182,7 +187,7 @@ func (n *sorterNode) start( case <-n.preparedCh: } - n.status.Store(TableStateReplicating) + n.state.Store(TableStateReplicating) eventSorter.EmitStartTs(stdCtx, startTs) for { @@ -306,11 +311,13 @@ func (n *sorterNode) handleRawEvent(ctx context.Context, event *model.Polymorphi // resolved ts. event = model.NewResolvedPolymorphicEvent(0, n.BarrierTs()) } - // sorterNode is preparing, this is must the first `Resolved event` received - // the indicator that all regions connected. - if n.status.Load() == TableStatePreparing { + // sorterNode is preparing, and a resolved ts greater than the `sorterNode` + // startTs (which is used to initialize the `sorterNode.resolvedTs`) received, + // this indicates that all regions connected, + // and sorter have data can be consumed by downstream. + if n.state.Load() == TableStatePreparing { log.Info("sorterNode, first resolved event received", zap.Any("event", event)) - n.status.Store(TableStatePrepared) + n.state.Store(TableStatePrepared) close(n.preparedCh) } } @@ -355,4 +362,4 @@ func (n *sorterNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) } -func (n *sorterNode) Status() TableState { return n.status.Load() } +func (n *sorterNode) State() TableState { return n.state.Load() } diff --git a/cdc/processor/pipeline/sorter_test.go b/cdc/processor/pipeline/sorter_test.go index f885f0c0a23..ffccfbe6d43 100644 --- a/cdc/processor/pipeline/sorter_test.go +++ b/cdc/processor/pipeline/sorter_test.go @@ -59,16 +59,20 @@ func TestUnifiedSorterFileLockConflict(t *testing.T) { func TestSorterResolvedTs(t *testing.T) { t.Parallel() - sn := newSorterNode("tableName", 1, 1, nil, nil, &config.ReplicaConfig{ - Consistent: &config.ConsistentConfig{}, - }) + state := TableStatePreparing + sn := newSorterNode("tableName", 1, 1, nil, nil, + &config.ReplicaConfig{Consistent: &config.ConsistentConfig{}}, &state, + model.DefaultChangeFeedID("changefeed-id-test")) sn.sorter = memory.NewEntrySorter() - require.EqualValues(t, 1, sn.ResolvedTs()) + require.Equal(t, model.Ts(1), sn.ResolvedTs()) + require.Equal(t, TableStatePreparing, sn.State()) + msg := pmessage.PolymorphicEventMessage(model.NewResolvedPolymorphicEvent(0, 2)) ok, err := sn.TryHandleDataMessage(context.Background(), msg) require.True(t, ok) require.Nil(t, err) - require.EqualValues(t, 2, sn.ResolvedTs()) + require.EqualValues(t, model.Ts(2), sn.ResolvedTs()) + require.Equal(t, TableStatePrepared, sn.State()) } type checkSorter struct { @@ -108,13 +112,14 @@ func TestSorterResolvedTsLessEqualBarrierTs(t *testing.T) { t.Parallel() sch := make(chan *model.PolymorphicEvent, 1) s := &checkSorter{ch: sch} - sn := newSorterNode("tableName", 1, 1, nil, nil, &config.ReplicaConfig{ - Consistent: &config.ConsistentConfig{}, - }) + state := TableStatePreparing + sn := newSorterNode("tableName", 1, 1, nil, nil, + &config.ReplicaConfig{Consistent: &config.ConsistentConfig{}}, &state, + model.DefaultChangeFeedID("changefeed-id-test")) sn.sorter = s ch := make(chan pmessage.Message, 1) - require.EqualValues(t, 1, sn.ResolvedTs()) + require.Equal(t, model.Ts(1), sn.ResolvedTs()) // Resolved ts must not regress even if there is no barrier ts message. resolvedTs1 := pmessage.PolymorphicEventMessage(model.NewResolvedPolymorphicEvent(0, 1)) @@ -122,6 +127,7 @@ func TestSorterResolvedTsLessEqualBarrierTs(t *testing.T) { require.True(t, ok) require.Nil(t, err) require.EqualValues(t, model.NewResolvedPolymorphicEvent(0, 1), <-sch) + require.Equal(t, TableStatePrepared, sn.State()) // Advance barrier ts. nctx := pipeline.NewNodeContext( diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index 6eb9d82d224..3e8bb0a72ae 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -26,7 +26,7 @@ const ( resolvedTsInterpolateInterval = 200 * time.Millisecond ) -// TableState is status of the table pipeline +// TableState is state of the table pipeline type TableState int32 // TableState for table pipeline @@ -68,7 +68,7 @@ func (s *TableState) Store(new TableState) { atomic.StoreInt32((*int32)(s), int32(new)) } -// TableMeta is the meta data of a table. +// TableMeta is the metadata of a table. type TableMeta struct { TableID model.TableID CheckpointTs model.Ts @@ -96,8 +96,8 @@ type TablePipeline interface { // Workload returns the workload of this table Workload() model.WorkloadInfo - // Status returns the status of this table pipeline - Status() TableState + // State returns the state of this table pipeline + State() TableState // Cancel stops this table pipeline immediately and destroy all resources created by this table pipeline Cancel() // Wait waits for table pipeline destroyed diff --git a/cdc/processor/pipeline/table_actor.go b/cdc/processor/pipeline/table_actor.go index bca2f6fa3ec..2dc98d27914 100644 --- a/cdc/processor/pipeline/table_actor.go +++ b/cdc/processor/pipeline/table_actor.go @@ -60,6 +60,8 @@ type tableActor struct { tableSink sink.Sink redoManager redo.LogManager + state TableState + pullerNode *pullerNode sortNode *sorterNode sinkNode *sinkNode @@ -124,6 +126,7 @@ func NewTableActor(cdcCtx cdcContext.Context, wg: wg, cancel: cancel, + state: TableStatePreparing, tableID: tableID, markTableID: replicaInfo.MarkTableID, tableName: tableName, @@ -282,7 +285,7 @@ func (t *tableActor) start(sdtTableContext context.Context) error { flowController := flowcontrol.NewTableFlowController(t.memoryQuota, t.redoManager.Enabled()) sorterNode := newSorterNode(t.tableName, t.tableID, t.replicaInfo.StartTs, flowController, - t.mounter, t.replicaConfig, + t.mounter, t.replicaConfig, &t.state, t.changefeedID, ) t.sortNode = sorterNode sortActorNodeContext := newContext(sdtTableContext, t.tableName, @@ -315,7 +318,7 @@ func (t *tableActor) start(sdtTableContext context.Context) error { actorSinkNode := newSinkNode(t.tableID, t.tableSink, t.replicaInfo.StartTs, - t.targetTs, flowController, t.redoManager) + t.targetTs, flowController, t.redoManager, &t.state, t.changefeedID) actorSinkNode.initWithReplicaConfig(t.replicaConfig) t.sinkNode = actorSinkNode @@ -381,12 +384,12 @@ func (t *tableActor) handleError(err error) { } } -// ============ Implement TablePipline, must be threadsafe ============ +// ============ Implement TablePipeline, must be thread-safe ============ // ResolvedTs returns the resolved ts in this table pipeline func (t *tableActor) ResolvedTs() model.Ts { // TODO: after TiCDC introduces p2p based resolved ts mechanism, TiCDC nodes - // will be able to cooperate replication status directly. Then we will add + // will be able to cooperate replication state directly. Then we will add // another replication barrier for consistent replication instead of reusing // the global resolved-ts. if t.redoManager.Enabled() { @@ -441,16 +444,9 @@ func (t *tableActor) Workload() model.WorkloadInfo { return workload } -// Status returns the status of this table pipeline -func (t *tableActor) Status() TableState { - sortStatus := t.sortNode.Status() - // first resolved ts not received yet, still preparing... - if sortStatus == TableStatePreparing { - return TableStatePreparing - } - - // sinkNode is status indicator now. - return t.sinkNode.Status() +// State returns the state of this table pipeline +func (t *tableActor) State() TableState { + return t.state.Load() } // ID returns the ID of source table and mark table @@ -466,7 +462,7 @@ func (t *tableActor) Name() string { // Cancel stops this table pipeline immediately and destroy all resources // created by this table pipeline func (t *tableActor) Cancel() { - // cancel wait group, release resource and mark the status as stopped + // cancel wait group, release resource and mark the state as stopped t.stop(nil) // actor is closed, tick actor to remove this actor router msg := pmessage.TickMessage() diff --git a/cdc/processor/pipeline/table_actor_test.go b/cdc/processor/pipeline/table_actor_test.go index fb80150fd1c..aa3b7f844da 100644 --- a/cdc/processor/pipeline/table_actor_test.go +++ b/cdc/processor/pipeline/table_actor_test.go @@ -50,8 +50,10 @@ func TestAsyncStopFailed(t *testing.T) { redoManager: redo.NewDisabledManager(), cancel: func() {}, reportErr: func(err error) {}, + state: TableStatePreparing, } - tbl.sinkNode = newSinkNode(1, &mockSink{}, 0, 0, &mockFlowController{}, tbl.redoManager) + tbl.sinkNode = newSinkNode(1, &mockSink{}, 0, 0, &mockFlowController{}, tbl.redoManager, + &tbl.state, model.DefaultChangeFeedID("changefeed-test")) require.True(t, tbl.AsyncStop(1)) mb := actor.NewMailbox[pmessage.Message](actor.ID(1), 0) @@ -63,42 +65,45 @@ func TestAsyncStopFailed(t *testing.T) { } func TestTableActorInterface(t *testing.T) { - sink := &sinkNode{status: TableStatePrepared} - sorter := &sorterNode{resolvedTs: 5} - tbl := &tableActor{ - markTableID: 2, + table := &tableActor{ tableID: 1, + markTableID: 2, redoManager: redo.NewDisabledManager(), - sinkNode: sink, - sortNode: sorter, tableName: "t1", + state: TableStatePreparing, replicaConfig: &serverConfig.ReplicaConfig{ Consistent: &serverConfig.ConsistentConfig{ Level: "node", }, }, } - tableID, markID := tbl.ID() + table.sinkNode = &sinkNode{state: &table.state} + table.sortNode = &sorterNode{state: &table.state, resolvedTs: 5} + + tableID, markID := table.ID() require.Equal(t, int64(1), tableID) require.Equal(t, int64(2), markID) - require.Equal(t, "t1", tbl.Name()) - require.Equal(t, TableStatePreparing, tbl.Status()) + require.Equal(t, "t1", table.Name()) + require.Equal(t, TableStatePreparing, table.State()) + + table.sortNode.state.Store(TableStatePrepared) + require.Equal(t, TableStatePrepared, table.State()) - sorter.status.Store(TableStatePrepared) - sink.status.Store(TableStateStopped) - require.Equal(t, TableStateStopped, tbl.Status()) - require.Equal(t, uint64(1), tbl.Workload().Workload) + require.Equal(t, uint64(1), table.Workload().Workload) - sink.checkpointTs.Store(model.NewResolvedTs(3)) - require.Equal(t, model.Ts(3), tbl.CheckpointTs()) + table.sinkNode.checkpointTs.Store(model.NewResolvedTs(3)) + require.Equal(t, model.Ts(3), table.CheckpointTs()) - require.Equal(t, model.Ts(5), tbl.ResolvedTs()) - tbl.replicaConfig.Consistent.Level = string(redo.ConsistentLevelEventual) + require.Equal(t, model.Ts(5), table.ResolvedTs()) + table.replicaConfig.Consistent.Level = string(redo.ConsistentLevelEventual) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tbl.redoManager, _ = redo.NewMockManager(ctx) - sink.resolvedTs.Store(model.NewResolvedTs(6)) - require.Equal(t, model.Ts(6), tbl.ResolvedTs()) + table.redoManager, _ = redo.NewMockManager(ctx) + table.sinkNode.resolvedTs.Store(model.NewResolvedTs(6)) + require.Equal(t, model.Ts(6), table.ResolvedTs()) + + table.sinkNode.state.Store(TableStateStopped) + require.Equal(t, TableStateStopped, table.State()) } func TestTableActorCancel(t *testing.T) { @@ -112,6 +117,7 @@ func TestTableActorCancel(t *testing.T) { }() tbl := &tableActor{ + state: TableStatePreparing, stopped: 0, tableID: 1, redoManager: redo.NewDisabledManager(), @@ -119,12 +125,18 @@ func TestTableActorCancel(t *testing.T) { cancel: func() {}, reportErr: func(err error) {}, } + tbl.sinkNode = &sinkNode{ + state: &tbl.state, + flowController: &mockFlowController{}, + sink: &mockSink{}, + } mb := actor.NewMailbox[pmessage.Message](actor.ID(1), 0) tbl.actorID = actor.ID(1) require.Nil(t, tableActorSystem.Spawn(mb, tbl)) tbl.mb = mb tbl.Cancel() require.Equal(t, stopped, tbl.stopped) + require.Equal(t, TableStateStopped, tbl.State()) } func TestTableActorWait(t *testing.T) { @@ -147,7 +159,7 @@ func TestTableActorWait(t *testing.T) { func TestHandleError(t *testing.T) { canceled := false reporterErr := false - tbl := &tableActor{ + table := &tableActor{ redoManager: redo.NewDisabledManager(), cancel: func() { canceled = true @@ -155,29 +167,33 @@ func TestHandleError(t *testing.T) { reportErr: func(err error) { reporterErr = true }, - sinkNode: &sinkNode{ - sink: &errorCloseSink{}, - status: TableStatePreparing, - flowController: &mockFlowController{}, - }, - sortNode: &sorterNode{ - flowController: &mockFlowController{}, - }, + + state: TableStateReplicating, stopped: stopped, } + flowController := &mockFlowController{} + table.sinkNode = &sinkNode{ + sink: &errorCloseSink{}, + state: &table.state, + flowController: flowController, + } + table.sortNode = &sorterNode{ + flowController: flowController, + } + // table is already stopped - tbl.handleError(nil) - require.Equal(t, TableStatePreparing, tbl.sinkNode.status) + table.handleError(nil) + require.Equal(t, TableStateReplicating, table.sinkNode.state.Load()) require.False(t, canceled) require.True(t, reporterErr) - tbl.stopped = 0 + table.stopped = 0 reporterErr = false - tbl.handleError(nil) + table.handleError(nil) require.True(t, canceled) require.True(t, reporterErr) - require.Equal(t, stopped, tbl.stopped) - require.Equal(t, TableStateStopped, tbl.sinkNode.status) + require.Equal(t, stopped, table.stopped) + require.Equal(t, TableStateStopped, table.sinkNode.state.Load()) } func TestPollStoppedActor(t *testing.T) { @@ -192,34 +208,34 @@ func TestPollStoppedActor(t *testing.T) { func TestPollTickMessage(t *testing.T) { startTime := time.Now().Add(-sinkFlushInterval) + table := tableActor{ + state: TableStatePreparing, + lastFlushSinkTime: time.Now().Add(-2 * sinkFlushInterval), + cancel: func() {}, + reportErr: func(err error) {}, + } - sn := &sinkNode{ - status: TableStatePreparing, + table.sinkNode = &sinkNode{ + state: &table.state, sink: &mockSink{}, flowController: &mockFlowController{}, targetTs: 11, } - sn.resolvedTs.Store(model.NewResolvedTs(10)) - sn.checkpointTs.Store(model.NewResolvedTs(10)) + table.sinkNode.resolvedTs.Store(model.NewResolvedTs(10)) + table.sinkNode.checkpointTs.Store(model.NewResolvedTs(10)) - tbl := tableActor{ - sinkNode: sn, - lastFlushSinkTime: time.Now().Add(-2 * sinkFlushInterval), - cancel: func() {}, - reportErr: func(err error) {}, - } - require.True(t, tbl.Poll(context.TODO(), []message.Message[pmessage.Message]{ + require.True(t, table.Poll(context.TODO(), []message.Message[pmessage.Message]{ message.ValueMessage[pmessage.Message](pmessage.TickMessage()), })) - require.True(t, tbl.lastFlushSinkTime.After(startTime)) - startTime = tbl.lastFlushSinkTime - require.True(t, tbl.Poll(context.TODO(), []message.Message[pmessage.Message]{ + require.True(t, table.lastFlushSinkTime.After(startTime)) + startTime = table.lastFlushSinkTime + require.True(t, table.Poll(context.TODO(), []message.Message[pmessage.Message]{ message.ValueMessage[pmessage.Message](pmessage.TickMessage()), })) - require.True(t, tbl.lastFlushSinkTime.Equal(startTime)) - tbl.lastFlushSinkTime = time.Now().Add(-2 * sinkFlushInterval) - tbl.sinkNode.status = TableStateStopped - require.False(t, tbl.Poll(context.TODO(), []message.Message[pmessage.Message]{ + require.True(t, table.lastFlushSinkTime.Equal(startTime)) + table.lastFlushSinkTime = time.Now().Add(-2 * sinkFlushInterval) + table.state.Store(TableStateStopped) + require.False(t, table.Poll(context.TODO(), []message.Message[pmessage.Message]{ message.ValueMessage[pmessage.Message](pmessage.TickMessage()), })) } @@ -228,16 +244,17 @@ func TestPollStopMessage(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) tbl := tableActor{ - sinkNode: &sinkNode{ - status: TableStateStopped, - sink: &mockSink{}, - flowController: &mockFlowController{}, - }, + state: TableStateStopped, cancel: func() { wg.Done() }, reportErr: func(err error) {}, } + tbl.sinkNode = &sinkNode{ + state: &tbl.state, + sink: &mockSink{}, + flowController: &mockFlowController{}, + } tbl.Poll(context.TODO(), []message.Message[pmessage.Message]{ message.StopMessage[pmessage.Message](), }) @@ -280,9 +297,9 @@ func TestPollDataFailed(t *testing.T) { return false, errors.New("error") } tbl := tableActor{ + state: TableStatePreparing, cancel: func() {}, reportErr: func(err error) {}, - sinkNode: &sinkNode{sink: &mockSink{}, flowController: &mockFlowController{}}, lastFlushSinkTime: time.Now(), nodes: []*ActorNode{ { @@ -291,6 +308,11 @@ func TestPollDataFailed(t *testing.T) { }, }, } + tbl.sinkNode = &sinkNode{ + sink: &mockSink{}, + flowController: &mockFlowController{}, + state: &tbl.state, + } require.False(t, tbl.Poll(context.TODO(), []message.Message[pmessage.Message]{ message.ValueMessage[pmessage.Message](pmessage.TickMessage()), })) @@ -370,6 +392,7 @@ func TestNewTableActor(t *testing.T) { }, &mockSink{}, redo.NewDisabledManager(), 10) require.NotNil(t, tbl) require.Nil(t, err) + require.Equal(t, TableStatePreparing, tbl.State()) require.NotPanics(t, func() { tbl.UpdateBarrierTs(model.Ts(5)) }) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 9e592360cf4..ac2a4d35f41 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -131,7 +131,7 @@ func (p *processor) AddTable( table, ok := p.tables[tableID] if ok { - switch table.Status() { + switch table.State() { // table is still `preparing`, which means the table is `replicating` on other captures. // no matter `isPrepare` or not, just ignore it should be ok. case pipeline.TableStatePreparing: @@ -260,25 +260,15 @@ func (p *processor) IsAddTableFinished(ctx context.Context, tableID model.TableI localResolvedTs := p.resolvedTs globalResolvedTs := p.changefeed.Status.ResolvedTs - localCheckpointTs := p.agent.GetLastSentCheckpointTs() globalCheckpointTs := p.changefeed.Status.CheckpointTs done := func() bool { if isPrepare { // todo: add ut to cover this, after 2ps supported. - return table.Status() == pipeline.TableStatePrepared + return table.State() == pipeline.TableStatePrepared } - - // todo: revise these 2 conditions, after 2ps supported. - // how about just check status is `TableStateReplicating`. - if table.CheckpointTs() < localCheckpointTs || localCheckpointTs < globalCheckpointTs { - return false - } - if table.ResolvedTs() < localResolvedTs || - localResolvedTs < globalResolvedTs { - return false - } - return true + // The table is `replicating`, it's indicating that the `add table` must be finished. + return table.State() == pipeline.TableStateReplicating } if !done() { log.Debug("Add Table not finished", @@ -290,9 +280,8 @@ func (p *processor) IsAddTableFinished(ctx context.Context, tableID model.TableI zap.Uint64("localResolvedTs", localResolvedTs), zap.Uint64("globalResolvedTs", globalResolvedTs), zap.Uint64("tableCheckpointTs", table.CheckpointTs()), - zap.Uint64("localCheckpointTs", localCheckpointTs), zap.Uint64("globalCheckpointTs", globalCheckpointTs), - zap.Any("status", table.Status()), zap.Bool("isPrepare", isPrepare)) + zap.Any("state", table.State()), zap.Bool("isPrepare", isPrepare)) return false } @@ -305,9 +294,8 @@ func (p *processor) IsAddTableFinished(ctx context.Context, tableID model.TableI zap.Uint64("localResolvedTs", localResolvedTs), zap.Uint64("globalResolvedTs", globalResolvedTs), zap.Uint64("tableCheckpointTs", table.CheckpointTs()), - zap.Uint64("localCheckpointTs", localCheckpointTs), zap.Uint64("globalCheckpointTs", globalCheckpointTs), - zap.Any("status", table.Status()), zap.Bool("isPrepare", isPrepare)) + zap.Any("state", table.State()), zap.Bool("isPrepare", isPrepare)) return true } @@ -326,7 +314,7 @@ func (p *processor) IsRemoveTableFinished(ctx context.Context, tableID model.Tab zap.Int64("tableID", tableID)) return 0, true } - status := table.Status() + status := table.State() if status != pipeline.TableStateStopped { log.Debug("table is still not stopped", zap.String("captureID", p.captureInfo.ID), @@ -381,7 +369,7 @@ func (p *processor) GetTableMeta(tableID model.TableID) pipeline.TableMeta { TableID: tableID, CheckpointTs: table.CheckpointTs(), ResolvedTs: table.ResolvedTs(), - State: table.Status(), + State: table.State(), } } @@ -788,7 +776,8 @@ func (p *processor) handlePosition(currentTs int64) { minResolvedTs = p.schemaStorage.ResolvedTs() } for _, table := range p.tables { - status := table.Status() + status := table.State() + // todo: add ut to cover this. if status == pipeline.TableStatePreparing || status == pipeline.TableStatePrepared { continue @@ -803,7 +792,8 @@ func (p *processor) handlePosition(currentTs int64) { minCheckpointTs := minResolvedTs minCheckpointTableID := int64(0) for _, table := range p.tables { - status := table.Status() + status := table.State() + // todo: add ut to cover this if status == pipeline.TableStatePreparing || status == pipeline.TableStatePrepared { continue @@ -841,7 +831,8 @@ func (p *processor) pushResolvedTs2Table() { resolvedTs = schemaResolvedTs } for _, table := range p.tables { - if table.Status() == pipeline.TableStateReplicating { + // todo: add ut to cover this + if table.State() == pipeline.TableStateReplicating { table.UpdateBarrierTs(resolvedTs) } } @@ -1040,7 +1031,7 @@ func (p *processor) Close() error { func (p *processor) WriteDebugInfo(w io.Writer) { fmt.Fprintf(w, "%+v\n", *p.changefeed) for tableID, tablePipeline := range p.tables { - fmt.Fprintf(w, "tableID: %d, tableName: %s, resolvedTs: %d, checkpointTs: %d, status: %s\n", - tableID, tablePipeline.Name(), tablePipeline.ResolvedTs(), tablePipeline.CheckpointTs(), tablePipeline.Status()) + fmt.Fprintf(w, "tableID: %d, tableName: %s, resolvedTs: %d, checkpointTs: %d, state: %s\n", + tableID, tablePipeline.Name(), tablePipeline.ResolvedTs(), tablePipeline.CheckpointTs(), tablePipeline.State()) } } diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index cf68b7183f5..9bb9ccf39b2 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -113,7 +113,7 @@ func newMockTablePipeline(ctx cdcContext.Context, tableID model.TableID, replica return &mockTablePipeline{ tableID: tableID, name: fmt.Sprintf("`test`.`table%d`", tableID), - status: pipeline.TableStatePreparing, + state: pipeline.TableStatePreparing, resolvedTs: replicaInfo.StartTs, checkpointTs: replicaInfo.StartTs, }, nil @@ -126,7 +126,7 @@ type mockTablePipeline struct { checkpointTs model.Ts barrierTs model.Ts stopTs model.Ts - status pipeline.TableState + state pipeline.TableState canceled bool sinkStartTs model.Ts @@ -161,8 +161,26 @@ func (m *mockTablePipeline) Workload() model.WorkloadInfo { return model.WorkloadInfo{Workload: 1} } -func (m *mockTablePipeline) Status() pipeline.TableState { - return m.status +func (m *mockTablePipeline) State() pipeline.TableState { + if m.state == pipeline.TableStateStopped { + return m.state + } + + if m.state == pipeline.TableStatePreparing { + // `resolvedTs` and `checkpointTs` is initialized by the same `start-ts` + // once `resolvedTs` > `checkpointTs`, is means the sorter received the first + // resolved event, let it become prepared. + if m.resolvedTs > m.checkpointTs { + m.state = pipeline.TableStatePrepared + } + } + + if m.sinkStartTs != model.Ts(0) { + if m.checkpointTs > m.sinkStartTs { + m.state = pipeline.TableStateReplicating + } + } + return m.state } func (m *mockTablePipeline) Cancel() { @@ -183,7 +201,6 @@ func (m *mockTablePipeline) MemoryConsumption() uint64 { func (m *mockTablePipeline) Start(ts model.Ts) bool { m.sinkStartTs = ts - m.status = pipeline.TableStateReplicating return true } @@ -233,6 +250,112 @@ func (a *mockAgent) Close() error { return nil } +func TestTableExecutorAddingTableIndirectly(t *testing.T) { + ctx := cdcContext.NewBackendContext4Test(true) + p, tester := initProcessor4Test(ctx, t) + + var err error + // init tick + _, err = p.Tick(ctx, p.changefeed) + require.Nil(t, err) + tester.MustApplyPatches() + p.changefeed.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + status.CheckpointTs = 20 + status.ResolvedTs = 20 + return status, true, nil + }) + tester.MustApplyPatches() + + // no operation + _, err = p.Tick(ctx, p.changefeed) + require.Nil(t, err) + tester.MustApplyPatches() + + // table-1: `preparing` -> `prepared` -> `replicating` + ok, err := p.AddTable(ctx, 1, 20, true) + require.NoError(t, err) + require.True(t, ok) + + table1 := p.tables[1].(*mockTablePipeline) + require.Equal(t, model.Ts(20), table1.resolvedTs) + require.Equal(t, model.Ts(20), table1.checkpointTs) + require.Equal(t, model.Ts(0), table1.sinkStartTs) + + ok, err = p.AddTable(ctx, 2, 20, false) + require.NoError(t, err) + require.True(t, ok) + + table2 := p.tables[2].(*mockTablePipeline) + require.Equal(t, model.Ts(20), table2.resolvedTs) + require.Equal(t, model.Ts(20), table2.checkpointTs) + require.Equal(t, model.Ts(20), table2.sinkStartTs) + + require.Len(t, p.tables, 2) + + checkpointTs := p.agent.GetLastSentCheckpointTs() + require.Equal(t, checkpointTs, model.Ts(0)) + + done := p.IsAddTableFinished(ctx, 1, true) + require.False(t, done) + require.Equal(t, pipeline.TableStatePreparing, table1.State()) + + done = p.IsAddTableFinished(ctx, 2, false) + require.False(t, done) + require.Equal(t, pipeline.TableStatePreparing, table2.State()) + + // push the resolved ts, mock that sorterNode receive first resolved event + table1.resolvedTs = 101 + table2.resolvedTs = 101 + + _, err = p.Tick(ctx, p.changefeed) + require.Nil(t, err) + tester.MustApplyPatches() + + done = p.IsAddTableFinished(ctx, 1, true) + require.True(t, done) + require.Equal(t, pipeline.TableStatePrepared, table1.State()) + + done = p.IsAddTableFinished(ctx, 2, false) + require.False(t, done) + require.Equal(t, pipeline.TableStatePrepared, table1.State()) + + // no table is `replicating` + checkpointTs = p.agent.GetLastSentCheckpointTs() + require.Equal(t, checkpointTs, uint64(math.MaxUint64)) + + ok, err = p.AddTable(ctx, 1, 30, true) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, model.Ts(0), table1.sinkStartTs) + + ok, err = p.AddTable(ctx, 1, 30, false) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, model.Ts(30), table1.sinkStartTs) + + table1.checkpointTs = 60 + table2.checkpointTs = 70 + + _, err = p.Tick(ctx, p.changefeed) + require.Nil(t, err) + tester.MustApplyPatches() + + done = p.IsAddTableFinished(ctx, 1, false) + require.True(t, done) + require.Equal(t, pipeline.TableStateReplicating, table1.State()) + + done = p.IsAddTableFinished(ctx, 2, false) + require.True(t, done) + require.Equal(t, pipeline.TableStateReplicating, table2.State()) + + checkpointTs = p.agent.GetLastSentCheckpointTs() + require.Equal(t, table1.CheckpointTs(), checkpointTs) + + err = p.Close() + require.Nil(t, err) + require.Nil(t, p.agent) +} + func TestTableExecutorAddingTableDirectly(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) p, tester := initProcessor4Test(ctx, t) @@ -257,33 +380,35 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { ok, err := p.AddTable(ctx, 1, 20, false) require.Nil(t, err) require.True(t, ok) + table1 := p.tables[1].(*mockTablePipeline) require.Equal(t, model.Ts(20), table1.sinkStartTs) - require.Equal(t, pipeline.TableStateReplicating, table1.status) + require.Equal(t, pipeline.TableStatePreparing, table1.state) meta := p.GetTableMeta(model.TableID(1)) require.Equal(t, model.TableID(1), meta.TableID) - require.Equal(t, pipeline.TableStateReplicating, meta.State) + require.Equal(t, pipeline.TableStatePreparing, meta.State) ok, err = p.AddTable(ctx, 2, 20, false) require.Nil(t, err) require.True(t, ok) table2 := p.tables[2].(*mockTablePipeline) require.Equal(t, model.Ts(20), table2.sinkStartTs) - require.Equal(t, pipeline.TableStateReplicating, table2.status) + require.Equal(t, pipeline.TableStatePreparing, table2.state) ok, err = p.AddTable(ctx, 3, 20, false) require.Nil(t, err) require.True(t, ok) table3 := p.tables[3].(*mockTablePipeline) require.Equal(t, model.Ts(20), table3.sinkStartTs) - require.Equal(t, pipeline.TableStateReplicating, table3.status) + require.Equal(t, pipeline.TableStatePreparing, table3.state) ok, err = p.AddTable(ctx, 4, 20, false) require.Nil(t, err) require.True(t, ok) table4 := p.tables[4].(*mockTablePipeline) require.Equal(t, model.Ts(20), table4.sinkStartTs) - require.Equal(t, pipeline.TableStateReplicating, table4.status) + require.Equal(t, pipeline.TableStatePreparing, table4.state) + require.Len(t, p.tables, 4) checkpointTs := p.agent.GetLastSentCheckpointTs() @@ -291,24 +416,41 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { done := p.IsAddTableFinished(ctx, 1, false) require.False(t, done) + require.Equal(t, pipeline.TableStatePreparing, table1.State()) done = p.IsAddTableFinished(ctx, 2, false) require.False(t, done) + require.Equal(t, pipeline.TableStatePreparing, table2.State()) done = p.IsAddTableFinished(ctx, 3, false) require.False(t, done) + require.Equal(t, pipeline.TableStatePreparing, table3.State()) done = p.IsAddTableFinished(ctx, 4, false) require.False(t, done) + require.Equal(t, pipeline.TableStatePreparing, table4.State()) require.Len(t, p.tables, 4) _, err = p.Tick(ctx, p.changefeed) require.Nil(t, err) tester.MustApplyPatches() - // add table, push the resolvedTs, finished add table + // push the resolved ts, mock that sorterNode receive first resolved event table1.resolvedTs = 101 table2.resolvedTs = 101 table3.resolvedTs = 102 table4.resolvedTs = 103 + done = p.IsAddTableFinished(ctx, 1, false) + require.False(t, done) + require.Equal(t, pipeline.TableStatePrepared, table1.State()) + done = p.IsAddTableFinished(ctx, 2, false) + require.False(t, done) + require.Equal(t, pipeline.TableStatePrepared, table2.State()) + done = p.IsAddTableFinished(ctx, 3, false) + require.False(t, done) + require.Equal(t, pipeline.TableStatePrepared, table3.State()) + done = p.IsAddTableFinished(ctx, 4, false) + require.False(t, done) + require.Equal(t, pipeline.TableStatePrepared, table4.State()) + table1.checkpointTs = 30 table2.checkpointTs = 30 table3.checkpointTs = 30 @@ -316,12 +458,16 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { done = p.IsAddTableFinished(ctx, 1, false) require.True(t, done) + require.Equal(t, pipeline.TableStateReplicating, table1.State()) done = p.IsAddTableFinished(ctx, 2, false) require.True(t, done) + require.Equal(t, pipeline.TableStateReplicating, table2.State()) done = p.IsAddTableFinished(ctx, 3, false) require.True(t, done) + require.Equal(t, pipeline.TableStateReplicating, table3.State()) done = p.IsAddTableFinished(ctx, 4, false) require.True(t, done) + require.Equal(t, pipeline.TableStateReplicating, table4.State()) _, err = p.Tick(ctx, p.changefeed) require.Nil(t, err) @@ -337,7 +483,7 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { tester.MustApplyPatches() checkpointTs = p.agent.GetLastSentCheckpointTs() - require.Equal(t, model.Ts(60), checkpointTs) + require.Equal(t, table3.CheckpointTs(), checkpointTs) updateChangeFeedPosition(t, tester, ctx.ChangefeedVars().ID, 103, 60) @@ -369,7 +515,7 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { require.Equal(t, model.Ts(60), checkpointTs) // finish remove operations - table3.status = pipeline.TableStateStopped + table3.state = pipeline.TableStateStopped table3.checkpointTs = 65 _, err = p.Tick(ctx, p.changefeed) @@ -556,15 +702,25 @@ func TestPositionDeleted(t *testing.T) { require.Nil(t, err) tester.MustApplyPatches() + table1 := p.tables[1].(*mockTablePipeline) + table2 := p.tables[2].(*mockTablePipeline) + + table1.resolvedTs += 1 + table2.resolvedTs += 1 + + table1.checkpointTs += 1 + table2.checkpointTs += 1 + // cal position _, err = p.Tick(ctx, p.changefeed) require.Nil(t, err) tester.MustApplyPatches() - require.EqualValues(t, 30, p.checkpointTs) - require.EqualValues(t, 30, p.resolvedTs) + + require.Equal(t, model.Ts(31), p.checkpointTs) + require.Equal(t, model.Ts(31), p.resolvedTs) require.Contains(t, p.changefeed.TaskPositions, p.captureInfo.ID) - // some other delete the task position + // some others delete the task position p.changefeed.PatchTaskPosition(p.captureInfo.ID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { return nil, true, nil }) @@ -579,8 +735,8 @@ func TestPositionDeleted(t *testing.T) { _, err = p.Tick(ctx, p.changefeed) require.Nil(t, err) tester.MustApplyPatches() - require.EqualValues(t, 30, p.checkpointTs) - require.EqualValues(t, 30, p.resolvedTs) + require.Equal(t, model.Ts(31), p.checkpointTs) + require.Equal(t, model.Ts(31), p.resolvedTs) require.Contains(t, p.changefeed.TaskPositions, p.captureInfo.ID) } @@ -649,15 +805,26 @@ func TestUpdateBarrierTs(t *testing.T) { status.ResolvedTs = 10 return status, true, nil }) - p.schemaStorage.(*mockSchemaStorage).resolvedTs = 10 done, err := p.AddTable(ctx, model.TableID(1), 5, false) require.Nil(t, err) require.True(t, done) + + table1 := p.tables[1].(*mockTablePipeline) + require.Equal(t, pipeline.TableStatePreparing, table1.State()) _, err = p.Tick(ctx, p.changefeed) require.Nil(t, err) tester.MustApplyPatches() + table1.resolvedTs += 1 + table1.checkpointTs += 1 + + done = p.IsAddTableFinished(ctx, model.TableID(1), false) + require.True(t, done) + require.Equal(t, pipeline.TableStateReplicating, table1.State()) + + p.schemaStorage.(*mockSchemaStorage).resolvedTs = 11 + // Global resolved ts has advanced while schema storage stalls. p.changefeed.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { status.ResolvedTs = 20 @@ -666,14 +833,18 @@ func TestUpdateBarrierTs(t *testing.T) { _, err = p.Tick(ctx, p.changefeed) require.Nil(t, err) tester.MustApplyPatches() - tb := p.tables[model.TableID(1)].(*mockTablePipeline) - require.Equal(t, tb.barrierTs, uint64(10)) + + _, err = p.Tick(ctx, p.changefeed) + require.NoError(t, err) + tester.MustApplyPatches() + + require.Equal(t, model.Ts(11), table1.barrierTs) // Schema storage has advanced too. p.schemaStorage.(*mockSchemaStorage).resolvedTs = 15 _, err = p.Tick(ctx, p.changefeed) require.Nil(t, err) tester.MustApplyPatches() - tb = p.tables[model.TableID(1)].(*mockTablePipeline) - require.Equal(t, tb.barrierTs, uint64(15)) + + require.Equal(t, model.Ts(15), table1.barrierTs) } diff --git a/cdc/scheduler/internal/tp/agent.go b/cdc/scheduler/internal/tp/agent.go index 2f8dfe9257b..8d5cb4fea2d 100644 --- a/cdc/scheduler/internal/tp/agent.go +++ b/cdc/scheduler/internal/tp/agent.go @@ -85,7 +85,7 @@ func NewAgent(ctx context.Context, etcdCliCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - ownerCaptureID, err := etcdClient.GetOwnerID(etcdCliCtx, etcd.CaptureOwnerKey) + ownerCaptureID, err := etcdClient.GetOwnerID(etcdCliCtx) if err != nil { if err != concurrency.ErrElectionNoLeader { return nil, errors.Trace(err) @@ -94,25 +94,26 @@ func NewAgent(ctx context.Context, // If we are registered in Etcd, an elected Owner will have to // contact us before it can schedule any table. log.Info("tpscheduler: no owner found. We will wait for an owner to contact us.", - zap.String("capture", captureID), + zap.String("ownerCaptureID", ownerCaptureID), zap.String("namespace", changeFeedID.Namespace), zap.String("changefeed", changeFeedID.ID), zap.Error(err)) - return nil, nil + return result, nil } - log.Debug("tpscheduler: owner found", - zap.String("capture", captureID), + log.Info("tpscheduler: owner found", + zap.String("ownerCaptureID", ownerCaptureID), + zap.String("captureID", captureID), zap.String("namespace", changeFeedID.Namespace), - zap.String("changefeed", changeFeedID.ID), - zap.String("ownerID", captureID)) + zap.String("changefeed", changeFeedID.ID)) - revision, err := etcdClient.GetOwnerRevision(etcdCliCtx, captureID) + revision, err := etcdClient.GetOwnerRevision(etcdCliCtx, ownerCaptureID) if err != nil { if cerror.ErrOwnerNotFound.Equal(err) || cerror.ErrNotOwner.Equal(err) { // These are expected errors when no owner has been elected log.Info("tpscheduler: no owner found when querying for the owner revision", - zap.String("capture", captureID), + zap.String("ownerCaptureID", ownerCaptureID), + zap.String("captureID", captureID), zap.String("namespace", changeFeedID.Namespace), zap.String("changefeed", changeFeedID.ID), zap.Error(err)) @@ -186,8 +187,8 @@ func (a *agent) handleMessage(msg []*schedulepb.Message) []*schedulepb.Message { return result } -func (a *agent) tableStatus2PB(status pipeline.TableState) schedulepb.TableState { - switch status { +func (a *agent) tableStatus2PB(state pipeline.TableState) schedulepb.TableState { + switch state { case pipeline.TableStatePreparing: return schedulepb.TableStatePreparing case pipeline.TableStatePrepared: @@ -203,6 +204,7 @@ func (a *agent) tableStatus2PB(status pipeline.TableState) schedulepb.TableState default: } log.Warn("tpscheduler: table state unknown", + zap.Any("state", state), zap.String("capture", a.captureID), zap.String("namespace", a.changeFeedID.Namespace), zap.String("changefeed", a.changeFeedID.ID), @@ -215,7 +217,7 @@ func (a *agent) newTableStatus(tableID model.TableID) schedulepb.TableStatus { state := a.tableStatus2PB(meta.State) if task, ok := a.runningTasks[tableID]; ok { - // remove table task is not processed, or failed, + // there is task that try to remove the table, // return `stopping` instead of the real table state, // to indicate that the remove table request was received. if task.IsRemove == true { @@ -260,13 +262,22 @@ func (a *agent) handleMessageHeartbeat(expected []model.TableID) *schedulepb.Mes Tables: tables, IsStopping: a.stopping, } - return &schedulepb.Message{ + + message := &schedulepb.Message{ Header: a.newMessageHeader(), MsgType: schedulepb.MsgHeartbeatResponse, From: a.captureID, To: a.ownerInfo.captureID, HeartbeatResponse: response, } + + log.Debug("tpscheduler: agent generate heartbeat response", + zap.String("capture", a.captureID), + zap.String("namespace", a.changeFeedID.Namespace), + zap.String("changefeed", a.changeFeedID.ID), + zap.Any("message", message)) + + return message } type dispatchTableTaskStatus int32 @@ -303,7 +314,7 @@ func (a *agent) handleMessageDispatchTableRequest( switch req := request.Request.(type) { case *schedulepb.DispatchTableRequest_AddTable: if a.stopping { - log.Info("tpscheduler: agent decline handle add table request", + log.Info("tpscheduler: agent is stopping, and decline handle add table request", zap.String("capture", a.captureID), zap.String("namespace", a.changeFeedID.Namespace), zap.String("changefeed", a.changeFeedID.ID)) @@ -333,15 +344,20 @@ func (a *agent) handleMessageDispatchTableRequest( return } - if _, ok := a.runningTasks[task.TableID]; ok { - log.Warn("tpscheduler: agent found duplicate dispatch table request", + if task, ok := a.runningTasks[task.TableID]; ok { + log.Warn("tpscheduler: agent found duplicate task, ignore the current request", zap.String("capture", a.captureID), zap.String("namespace", a.changeFeedID.Namespace), zap.String("changefeed", a.changeFeedID.ID), + zap.Any("task", task), zap.Any("request", request)) return } - + log.Debug("tpscheduler: agent found dispatch table task", + zap.String("capture", a.captureID), + zap.String("namespace", a.changeFeedID.Namespace), + zap.String("changefeed", a.changeFeedID.ID), + zap.Any("task", task)) a.runningTasks[task.TableID] = task } @@ -364,7 +380,9 @@ func (a *agent) handleRemoveTableTask( } status := schedulepb.TableStatus{ TableID: task.TableID, - // todo: if the table is `absent`, also return a stopped here, `stopped` is identical to `absent`. + // after `IsRemoveTableFinished` return true, the table is removed, + // and `absent` will be return, we still return `stopped` here for the design purpose. + // but the `absent` is identical to `stopped`. we should only keep one. State: schedulepb.TableStateStopped, Checkpoint: schedulepb.Checkpoint{ CheckpointTs: checkpointTs, diff --git a/cdc/scheduler/internal/tp/transport.go b/cdc/scheduler/internal/tp/transport.go index da8ba4f0bb5..5474fc04d26 100644 --- a/cdc/scheduler/internal/tp/transport.go +++ b/cdc/scheduler/internal/tp/transport.go @@ -138,10 +138,12 @@ func (t *p2pTransport) Send( } } - log.Debug("tpscheduler: all messages sent", - zap.String("namespace", t.changefeed.Namespace), - zap.String("changefeed", t.changefeed.ID), - zap.Int("len", len(msgs))) + if len(msgs) != 0 { + log.Debug("tpscheduler: all messages sent", + zap.String("namespace", t.changefeed.Namespace), + zap.String("changefeed", t.changefeed.ID), + zap.Int("len", len(msgs))) + } return nil }