Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

latency(cdc): 2 phase scheduling processor bug fix #5728

Merged
merged 14 commits into from
Jun 6, 2022
2 changes: 1 addition & 1 deletion cdc/processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *managerTester) resetSuit(ctx cdcContext.Context, t *testing.T) {
return &mockTablePipeline{
tableID: tableID,
name: fmt.Sprintf("`test`.`table%d`", tableID),
status: tablepipeline.TableStateReplicating,
state: tablepipeline.TableStateReplicating,
resolvedTs: replicaInfo.StartTs,
checkpointTs: replicaInfo.StartTs,
}, nil
Expand Down
48 changes: 33 additions & 15 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,36 @@ const (

type sinkNode struct {
sink sink.Sink
status TableState
state *TableState
tableID model.TableID

// atomic oprations for model.ResolvedTs
// atomic operations for model.ResolvedTs
resolvedTs atomic.Value
checkpointTs model.Ts
targetTs model.Ts
barrierTs model.Ts

changefeed model.ChangeFeedID

flowController tableFlowController

replicaConfig *config.ReplicaConfig
}

func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode {
func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts,
targetTs model.Ts, flowController tableFlowController, state *TableState,
changefeed model.ChangeFeedID,
) *sinkNode {
sn := &sinkNode{
tableID: tableID,
sink: sink,
status: TableStatePrepared,
state: state,
targetTs: targetTs,
checkpointTs: startTs,
barrierTs: startTs,

changefeed: changefeed,

flowController: flowController,
}
sn.resolvedTs.Store(model.NewResolvedTs(startTs))
Expand All @@ -67,7 +74,7 @@ func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, target
func (n *sinkNode) ResolvedTs() model.ResolvedTs { return n.resolvedTs.Load().(model.ResolvedTs) }
func (n *sinkNode) CheckpointTs() model.Ts { return atomic.LoadUint64(&n.checkpointTs) }
func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) }
func (n *sinkNode) Status() TableState { return n.status.Load() }
func (n *sinkNode) State() TableState { return n.state.Load() }

func (n *sinkNode) initWithReplicaConfig(replicaConfig *config.ReplicaConfig) {
n.replicaConfig = replicaConfig
Expand All @@ -77,13 +84,16 @@ func (n *sinkNode) initWithReplicaConfig(replicaConfig *config.ReplicaConfig) {
// In this method, the builtin table sink will be closed by calling `Close`, and
// no more events can be sent to this sink node afterwards.
func (n *sinkNode) stop(ctx context.Context) (err error) {
// table stopped status must be set after underlying sink is closed
defer n.status.Store(TableStateStopped)
// table stopped state must be set after underlying sink is closed
defer n.state.Store(TableStateStopped)
err = n.sink.Close(ctx)
if err != nil {
return
}
log.Info("sink is closed", zap.Int64("tableID", n.tableID))
log.Info("sink is closed",
zap.Int64("tableID", n.tableID),
zap.String("namespace", n.changefeed.Namespace),
zap.String("changefeed", n.changefeed.ID))
err = cerror.ErrTableProcessorStoppedSafely.GenWithStackByArgs()
return
}
Expand All @@ -93,7 +103,7 @@ func (n *sinkNode) stop(ctx context.Context) (err error) {
func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (err error) {
defer func() {
if err != nil {
n.status.Store(TableStateStopped)
n.state.Store(TableStateStopped)
return
}
if atomic.LoadUint64(&n.checkpointTs) >= n.targetTs {
Expand Down Expand Up @@ -142,7 +152,11 @@ func (n *sinkNode) emitRowToSink(ctx context.Context, event *model.PolymorphicEv
})

if event == nil || event.Row == nil {
log.Warn("skip emit nil event", zap.Any("event", event))
log.Warn("skip emit nil event",
zap.Int64("tableID", n.tableID),
zap.String("namespace", n.changefeed.Namespace),
zap.String("changefeed", n.changefeed.ID),
zap.Any("event", event))
return nil
}

Expand All @@ -152,7 +166,11 @@ func (n *sinkNode) emitRowToSink(ctx context.Context, event *model.PolymorphicEv
// begin; insert into t (id) values (1); delete from t where id=1; commit;
// Just ignore these row changed events.
if colLen == 0 && preColLen == 0 {
log.Warn("skip emit empty row event", zap.Any("event", event))
log.Warn("skip emit empty row event",
zap.Int64("tableID", n.tableID),
zap.String("namespace", n.changefeed.Namespace),
zap.String("changefeed", n.changefeed.ID),
zap.Any("event", event))
return nil
}

Expand Down Expand Up @@ -240,15 +258,15 @@ func splitUpdateEvent(updateEvent *model.PolymorphicEvent) (*model.PolymorphicEv
}

func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (bool, error) {
if n.status.Load() == TableStateStopped {
if n.state.Load() == TableStateStopped {
return false, cerror.ErrTableProcessorStoppedSafely.GenWithStackByArgs()
}
switch msg.Tp {
case pmessage.MessageTypePolymorphicEvent:
event := msg.PolymorphicEvent
if event.IsResolved() {
if n.status.Load() == TableStatePrepared {
n.status.Store(TableStateReplicating)
if n.state.Load() == TableStatePrepared {
n.state.Store(TableStateReplicating)
}
failpoint.Inject("ProcessorSyncResolvedError", func() {
failpoint.Return(false, errors.New("processor sync resolved injected error"))
Expand Down Expand Up @@ -291,7 +309,7 @@ func (n *sinkNode) updateBarrierTs(ctx context.Context, ts model.Ts) error {
}

func (n *sinkNode) releaseResource(ctx context.Context) error {
n.status.Store(TableStateStopped)
n.state.Store(TableStateStopped)
n.flowController.Abort()
return n.sink.Close(ctx)
}
Loading