Skip to content

Commit

Permalink
latency(cdc): 2 phase scheduling processor bug fix (#5728)
Browse files Browse the repository at this point in the history
* fix by suggestion, use unique state for the table.
* fix TestTableExecutorAddingTableDirectly
* add ut TestTableExecutorAddingTableIndirectly
* add ut TestTableExecutorAddingTableIndirectly
  • Loading branch information
3AceShowHand authored and ti-chi-bot committed Jun 24, 2022
1 parent 542b558 commit 3066772
Show file tree
Hide file tree
Showing 12 changed files with 468 additions and 220 deletions.
2 changes: 1 addition & 1 deletion cdc/processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *managerTester) resetSuit(ctx cdcContext.Context, t *testing.T) {
return &mockTablePipeline{
tableID: tableID,
name: fmt.Sprintf("`test`.`table%d`", tableID),
status: tablepipeline.TableStateReplicating,
state: tablepipeline.TableStateReplicating,
resolvedTs: replicaInfo.StartTs,
checkpointTs: replicaInfo.StartTs,
}, nil
Expand Down
44 changes: 30 additions & 14 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,17 @@ import (

type sinkNode struct {
sink sink.Sink
status TableState
state *TableState
tableID model.TableID

// atomic oprations for model.ResolvedTs
// atomic operations for model.ResolvedTs
resolvedTs atomic.Value
checkpointTs atomic.Value
targetTs model.Ts
barrierTs model.Ts

changefeed model.ChangeFeedID

flowController tableFlowController
redoManager redo.LogManager

Expand All @@ -52,13 +54,16 @@ func newSinkNode(
startTs model.Ts, targetTs model.Ts,
flowController tableFlowController,
redoManager redo.LogManager,
state *TableState,
changefeed model.ChangeFeedID,
) *sinkNode {
sn := &sinkNode{
tableID: tableID,
sink: sink,
status: TableStatePrepared,
state: state,
targetTs: targetTs,
barrierTs: startTs,
changefeed: changefeed,
flowController: flowController,
redoManager: redoManager,
}
Expand All @@ -70,7 +75,7 @@ func newSinkNode(
func (n *sinkNode) ResolvedTs() model.Ts { return n.getResolvedTs().ResolvedMark() }
func (n *sinkNode) CheckpointTs() model.Ts { return n.getCheckpointTs().ResolvedMark() }
func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) }
func (n *sinkNode) Status() TableState { return n.status.Load() }
func (n *sinkNode) State() TableState { return n.state.Load() }

func (n *sinkNode) getResolvedTs() model.ResolvedTs {
return n.resolvedTs.Load().(model.ResolvedTs)
Expand All @@ -88,13 +93,16 @@ func (n *sinkNode) initWithReplicaConfig(replicaConfig *config.ReplicaConfig) {
// In this method, the builtin table sink will be closed by calling `Close`, and
// no more events can be sent to this sink node afterwards.
func (n *sinkNode) stop(ctx context.Context) (err error) {
// table stopped status must be set after underlying sink is closed
defer n.status.Store(TableStateStopped)
// table stopped state must be set after underlying sink is closed
defer n.state.Store(TableStateStopped)
err = n.sink.Close(ctx)
if err != nil {
return
}
log.Info("sink is closed", zap.Int64("tableID", n.tableID))
log.Info("sink is closed",
zap.Int64("tableID", n.tableID),
zap.String("namespace", n.changefeed.Namespace),
zap.String("changefeed", n.changefeed.ID))
err = cerror.ErrTableProcessorStoppedSafely.GenWithStackByArgs()
return
}
Expand All @@ -104,7 +112,7 @@ func (n *sinkNode) stop(ctx context.Context) (err error) {
func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (err error) {
defer func() {
if err != nil {
n.status.Store(TableStateStopped)
n.state.Store(TableStateStopped)
return
}
if n.CheckpointTs() >= n.targetTs {
Expand Down Expand Up @@ -187,7 +195,11 @@ func (n *sinkNode) emitRowToSink(ctx context.Context, event *model.PolymorphicEv
}

if event == nil || event.Row == nil {
log.Warn("skip emit nil event", zap.Any("event", event))
log.Warn("skip emit nil event",
zap.Int64("tableID", n.tableID),
zap.String("namespace", n.changefeed.Namespace),
zap.String("changefeed", n.changefeed.ID),
zap.Any("event", event))
return nil
}

Expand All @@ -197,7 +209,11 @@ func (n *sinkNode) emitRowToSink(ctx context.Context, event *model.PolymorphicEv
// begin; insert into t (id) values (1); delete from t where id=1; commit;
// Just ignore these row changed events.
if colLen == 0 && preColLen == 0 {
log.Warn("skip emit empty row event", zap.Any("event", event))
log.Warn("skip emit empty row event",
zap.Int64("tableID", n.tableID),
zap.String("namespace", n.changefeed.Namespace),
zap.String("changefeed", n.changefeed.ID),
zap.Any("event", event))
return nil
}

Expand Down Expand Up @@ -285,15 +301,15 @@ func splitUpdateEvent(updateEvent *model.PolymorphicEvent) (*model.PolymorphicEv
}

func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (bool, error) {
if n.status.Load() == TableStateStopped {
if n.state.Load() == TableStateStopped {
return false, cerror.ErrTableProcessorStoppedSafely.GenWithStackByArgs()
}
switch msg.Tp {
case pmessage.MessageTypePolymorphicEvent:
event := msg.PolymorphicEvent
if event.IsResolved() {
if n.status.Load() == TableStatePrepared {
n.status.Store(TableStateReplicating)
if n.state.Load() == TableStatePrepared {
n.state.Store(TableStateReplicating)
}
failpoint.Inject("ProcessorSyncResolvedError", func() {
failpoint.Return(false, errors.New("processor sync resolved injected error"))
Expand Down Expand Up @@ -340,7 +356,7 @@ func (n *sinkNode) updateBarrierTs(ctx context.Context, ts model.Ts) error {
}

func (n *sinkNode) releaseResource(ctx context.Context) error {
n.status.Store(TableStateStopped)
n.state.Store(TableStateStopped)
n.flowController.Abort()
return n.sink.Close(ctx)
}
Loading

0 comments on commit 3066772

Please sign in to comment.