Skip to content

Commit

Permalink
sink(cdc): fix the check about resolvedTs and checkpointTs (#9772)
Browse files Browse the repository at this point in the history
close #9769
  • Loading branch information
hicqu authored Sep 20, 2023
1 parent 907d937 commit f5ecaf9
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 80 deletions.
37 changes: 10 additions & 27 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -768,30 +768,13 @@ func (m *SinkManager) UpdateReceivedSorterResolvedTs(span tablepb.Span, ts model
}

// UpdateBarrierTs update all tableSink's barrierTs in the SinkManager
func (m *SinkManager) UpdateBarrierTs(
globalBarrierTs model.Ts,
tableBarrier map[model.TableID]model.Ts,
) {
func (m *SinkManager) UpdateBarrierTs(globalBarrierTs model.Ts, tableBarrier map[model.TableID]model.Ts) {
m.tableSinks.Range(func(span tablepb.Span, value interface{}) bool {
tableSink := value.(*tableSinkWrapper)
lastBarrierTs := tableSink.barrierTs.Load()
// It is safe to do not use compare and swap here.
// Only the processor will update the barrier ts.
// Other goroutines will only read the barrier ts.
// So it is safe to do not use compare and swap here, just Load and Store.
if tableBarrierTs, ok := tableBarrier[tableSink.span.TableID]; ok {
barrierTs := tableBarrierTs
if barrierTs > globalBarrierTs {
barrierTs = globalBarrierTs
}
if barrierTs > lastBarrierTs {
tableSink.barrierTs.Store(barrierTs)
}
} else {
if globalBarrierTs > lastBarrierTs {
tableSink.barrierTs.Store(globalBarrierTs)
}
barrierTs := globalBarrierTs
if tableBarrierTs, ok := tableBarrier[span.TableID]; ok && tableBarrierTs < globalBarrierTs {
barrierTs = tableBarrierTs
}
value.(*tableSinkWrapper).updateBarrierTs(barrierTs)
return true
})
}
Expand Down Expand Up @@ -1008,14 +991,14 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats {
resolvedTs = tableSink.getReceivedSorterResolvedTs()
}

if resolvedTs < checkpointTs.ResolvedMark() {
log.Error("sinkManager: resolved ts should not less than checkpoint ts",
sinkUpperBound := tableSink.getUpperBoundTs()
if sinkUpperBound < checkpointTs.ResolvedMark() {
log.Panic("sinkManager: sink upperbound should not less than checkpoint ts",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span),
zap.Uint64("resolvedTs", resolvedTs),
zap.Any("checkpointTs", checkpointTs),
zap.Uint64("barrierTs", tableSink.barrierTs.Load()))
zap.Uint64("upperbound", sinkUpperBound),
zap.Any("checkpointTs", checkpointTs))
}
return TableStats{
CheckpointTs: checkpointTs.ResolvedMark(),
Expand Down
9 changes: 9 additions & 0 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,15 @@ func (t *tableSinkWrapper) appendRowChangedEvents(events ...*model.RowChangedEve
return nil
}

func (t *tableSinkWrapper) updateBarrierTs(ts model.Ts) {
for {
old := t.barrierTs.Load()
if ts <= old || t.barrierTs.CompareAndSwap(old, ts) {
break
}
}
}

func (t *tableSinkWrapper) updateReceivedSorterResolvedTs(ts model.Ts) {
for {
old := t.receivedSorterResolvedTs.Load()
Expand Down
36 changes: 12 additions & 24 deletions cdc/scheduler/internal/v3/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,7 @@ func (a *agent) Tick(ctx context.Context) (*schedulepb.Barrier, error) {
return nil, errors.Trace(err)
}

outboundMessages, barrier, err := a.handleMessage(inboundMessages)
if err != nil {
return nil, errors.Trace(err)
}
outboundMessages, barrier := a.handleMessage(inboundMessages)

responses, err := a.tableM.poll(ctx)
if err != nil {
Expand Down Expand Up @@ -237,9 +234,7 @@ func (a *agent) handleLivenessUpdate(liveness model.Liveness) {
}
}

func (a *agent) handleMessage(msg []*schedulepb.Message) (
result []*schedulepb.Message, barrier *schedulepb.Barrier, err error,
) {
func (a *agent) handleMessage(msg []*schedulepb.Message) (result []*schedulepb.Message, barrier *schedulepb.Barrier) {
for _, message := range msg {
ownerCaptureID := message.GetFrom()
header := message.GetHeader()
Expand All @@ -254,10 +249,7 @@ func (a *agent) handleMessage(msg []*schedulepb.Message) (
switch message.GetMsgType() {
case schedulepb.MsgHeartbeat:
var reMsg *schedulepb.Message
reMsg, barrier, err = a.handleMessageHeartbeat(message.GetHeartbeat())
if err != nil {
return
}
reMsg, barrier = a.handleMessageHeartbeat(message.GetHeartbeat())
result = append(result, reMsg)
case schedulepb.MsgDispatchTableRequest:
a.handleMessageDispatchTableRequest(message.DispatchTableRequest, processorEpoch)
Expand All @@ -272,28 +264,24 @@ func (a *agent) handleMessage(msg []*schedulepb.Message) (
return
}

func (a *agent) handleMessageHeartbeat(request *schedulepb.Heartbeat) (
*schedulepb.Message, *schedulepb.Barrier, error,
) {
func (a *agent) handleMessageHeartbeat(request *schedulepb.Heartbeat) (*schedulepb.Message, *schedulepb.Barrier) {
allTables := a.tableM.getAllTableSpans()
result := make([]tablepb.TableStatus, 0, allTables.Len())

isValidCheckpointTs := true
allTables.Ascend(func(span tablepb.Span, table *tableSpan) bool {
status := table.getTableSpanStatus(request.CollectStats)
isValidCheckpointTs = status.Checkpoint.CheckpointTs <= status.Checkpoint.ResolvedTs
if status.Checkpoint.CheckpointTs > status.Checkpoint.ResolvedTs {
log.Warn("schedulerv3: CheckpointTs is greater than ResolvedTs",
zap.String("namespace", a.ChangeFeedID.Namespace),
zap.String("changefeed", a.ChangeFeedID.ID),
zap.String("span", span.String()))
}
if table.task != nil && table.task.IsRemove {
status.State = tablepb.TableStateStopping
}
result = append(result, status)
return isValidCheckpointTs
return true
})
if !isValidCheckpointTs {
status := result[len(result)-1]
checkpointTs := status.Checkpoint.CheckpointTs
resolvedTs := status.Checkpoint.ResolvedTs
return nil, nil, errors.ErrInvalidCheckpointTs.GenWithStackByArgs(checkpointTs, resolvedTs)
}
for _, span := range request.GetSpans() {
if _, ok := allTables.Get(span); !ok {
status := a.tableM.getTableSpanStatus(span, request.CollectStats)
Expand All @@ -320,7 +308,7 @@ func (a *agent) handleMessageHeartbeat(request *schedulepb.Heartbeat) (
zap.String("changefeed", a.ChangeFeedID.ID),
zap.Any("message", message))

return message, request.GetBarrier(), nil
return message, request.GetBarrier()
}

type dispatchTableTaskStatus int32
Expand Down
20 changes: 10 additions & 10 deletions cdc/scheduler/internal/v3/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func TestAgentHandleMessageHeartbeat(t *testing.T) {
},
}

response, _, _ := a.handleMessage([]*schedulepb.Message{heartbeat})
response, _ := a.handleMessage([]*schedulepb.Message{heartbeat})
require.Len(t, response, 1)
require.Equal(t, model.LivenessCaptureAlive, response[0].GetHeartbeatResponse().Liveness)

Expand All @@ -376,21 +376,21 @@ func TestAgentHandleMessageHeartbeat(t *testing.T) {
}

a.tableM.tables.GetV(spanz.TableIDToComparableSpan(1)).task = &dispatchTableTask{IsRemove: true}
response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
response, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
result = response[0].GetHeartbeatResponse().Tables
sort.Slice(result, func(i, j int) bool {
return result[i].Span.TableID < result[j].Span.TableID
})
require.Equal(t, tablepb.TableStateStopping, result[1].State)

a.handleLivenessUpdate(model.LivenessCaptureStopping)
response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
response, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
require.Len(t, response, 1)
require.Equal(t, model.LivenessCaptureStopping, response[0].GetHeartbeatResponse().Liveness)

a.handleLivenessUpdate(model.LivenessCaptureAlive)
heartbeat.Heartbeat.IsStopping = true
response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
response, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
require.Equal(t, model.LivenessCaptureStopping, response[0].GetHeartbeatResponse().Liveness)
require.Equal(t, model.LivenessCaptureStopping, a.liveness.Load())
}
Expand Down Expand Up @@ -577,7 +577,7 @@ func TestAgentHandleMessage(t *testing.T) {
}

// handle the first heartbeat, from the known owner.
response, _, _ := a.handleMessage([]*schedulepb.Message{heartbeat})
response, _ := a.handleMessage([]*schedulepb.Message{heartbeat})
require.Len(t, response, 1)

addTableRequest := &schedulepb.Message{
Expand All @@ -600,17 +600,17 @@ func TestAgentHandleMessage(t *testing.T) {
},
}
// wrong epoch, ignored
responses, _, _ := a.handleMessage([]*schedulepb.Message{addTableRequest})
responses, _ := a.handleMessage([]*schedulepb.Message{addTableRequest})
require.False(t, tableM.tables.Has(spanz.TableIDToComparableSpan(1)))
require.Len(t, responses, 0)

// correct epoch, processing.
addTableRequest.Header.ProcessorEpoch = a.Epoch
_, _, _ = a.handleMessage([]*schedulepb.Message{addTableRequest})
_, _ = a.handleMessage([]*schedulepb.Message{addTableRequest})
require.True(t, a.tableM.tables.Has(spanz.TableIDToComparableSpan(1)))

heartbeat.Header.OwnerRevision.Revision = 2
response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
response, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
require.Len(t, response, 1)

// this should never happen in real world
Expand All @@ -624,12 +624,12 @@ func TestAgentHandleMessage(t *testing.T) {
From: a.ownerInfo.ID,
}

response, _, _ = a.handleMessage([]*schedulepb.Message{unknownMessage})
response, _ = a.handleMessage([]*schedulepb.Message{unknownMessage})
require.Len(t, response, 0)

// staled message
heartbeat.Header.OwnerRevision.Revision = 1
response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
response, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
require.Len(t, response, 0)
}

Expand Down
61 changes: 42 additions & 19 deletions cdc/scheduler/internal/v3/replication/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,38 @@ func (r *Manager) AdvanceCheckpoint(
barrier *schedulepb.BarrierWithMinTs,
redoMetaManager redo.MetaManager,
) (newCheckpointTs, newResolvedTs model.Ts) {
var redoFlushedResolvedTs model.Ts
limitBarrierWithRedo := func(newCheckpointTs, newResolvedTs uint64) (uint64, uint64) {
flushedMeta := redoMetaManager.GetFlushedMeta()
redoFlushedResolvedTs = flushedMeta.ResolvedTs
log.Debug("owner gets flushed redo meta",
zap.String("namespace", r.changefeedID.Namespace),
zap.String("changefeed", r.changefeedID.ID),
zap.Uint64("flushedCheckpointTs", flushedMeta.CheckpointTs),
zap.Uint64("flushedResolvedTs", flushedMeta.ResolvedTs))
if flushedMeta.ResolvedTs < newResolvedTs {
newResolvedTs = flushedMeta.ResolvedTs
}

if newCheckpointTs > newResolvedTs {
newCheckpointTs = newResolvedTs
}

if barrier.GlobalBarrierTs > newResolvedTs {
barrier.GlobalBarrierTs = newResolvedTs
}
return newCheckpointTs, newResolvedTs
}
defer func() {
if redoFlushedResolvedTs != 0 && barrier.GlobalBarrierTs > redoFlushedResolvedTs {
log.Panic("barrierTs should never greater than redo flushed",
zap.String("namespace", r.changefeedID.Namespace),
zap.String("changefeed", r.changefeedID.ID),
zap.Uint64("barrierTs", barrier.GlobalBarrierTs),
zap.Uint64("redoFlushedResolvedTs", redoFlushedResolvedTs))
}
}()

newCheckpointTs, newResolvedTs = math.MaxUint64, math.MaxUint64
slowestRange := tablepb.Span{}
cannotProceed := false
Expand Down Expand Up @@ -573,6 +605,11 @@ func (r *Manager) AdvanceCheckpoint(
return true
})
if cannotProceed {
if redoMetaManager.Enabled() {
// If redo is enabled, GlobalBarrierTs should be limited by redo flushed meta.
newResolvedTs = barrier.RedoBarrierTs
limitBarrierWithRedo(newCheckpointTs, newResolvedTs)
}
return checkpointCannotProceed, checkpointCannotProceed
}
if slowestRange.TableID != 0 {
Expand Down Expand Up @@ -619,26 +656,12 @@ func (r *Manager) AdvanceCheckpoint(
newResolvedTs = barrier.RedoBarrierTs
}
redoMetaManager.UpdateMeta(newCheckpointTs, newResolvedTs)
flushedMeta := redoMetaManager.GetFlushedMeta()
flushedCheckpointTs, flushedResolvedTs := flushedMeta.CheckpointTs, flushedMeta.ResolvedTs
log.Debug("owner gets flushed meta",
zap.Uint64("flushedResolvedTs", flushedResolvedTs),
zap.Uint64("flushedCheckpointTs", flushedCheckpointTs),
zap.Uint64("newResolvedTs", newResolvedTs),
zap.Uint64("newCheckpointTs", newCheckpointTs),
log.Debug("owner updates redo meta",
zap.String("namespace", r.changefeedID.Namespace),
zap.String("changefeed", r.changefeedID.ID))
if flushedResolvedTs != 0 && flushedResolvedTs < newResolvedTs {
newResolvedTs = flushedResolvedTs
}

if newCheckpointTs > newResolvedTs {
newCheckpointTs = newResolvedTs
}

if barrier.GlobalBarrierTs > newResolvedTs {
barrier.GlobalBarrierTs = newResolvedTs
}
zap.String("changefeed", r.changefeedID.ID),
zap.Uint64("newCheckpointTs", newCheckpointTs),
zap.Uint64("newResolvedTs", newResolvedTs))
return limitBarrierWithRedo(newCheckpointTs, newResolvedTs)
}

return newCheckpointTs, newResolvedTs
Expand Down
45 changes: 45 additions & 0 deletions cdc/scheduler/internal/v3/replication/replication_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,51 @@ func TestReplicationManagerAdvanceCheckpoint(t *testing.T) {
require.Equal(t, model.Ts(9), barrier.GetGlobalBarrierTs())
}

func TestReplicationManagerAdvanceCheckpointWithRedoEnabled(t *testing.T) {
t.Parallel()
r := NewReplicationManager(1, model.ChangeFeedID{})
span := spanz.TableIDToComparableSpan(1)
rs, err := NewReplicationSet(span, model.Ts(10),
map[model.CaptureID]*tablepb.TableStatus{
"1": {
Span: spanz.TableIDToComparableSpan(1),
State: tablepb.TableStateReplicating,
Checkpoint: tablepb.Checkpoint{
CheckpointTs: model.Ts(10),
ResolvedTs: model.Ts(20),
},
},
}, model.ChangeFeedID{})
require.NoError(t, err)
r.spans.ReplaceOrInsert(span, rs)

span2 := spanz.TableIDToComparableSpan(2)
rs, err = NewReplicationSet(span2, model.Ts(15),
map[model.CaptureID]*tablepb.TableStatus{
"2": {
Span: spanz.TableIDToComparableSpan(2),
State: tablepb.TableStateReplicating,
Checkpoint: tablepb.Checkpoint{
CheckpointTs: model.Ts(15),
ResolvedTs: model.Ts(30),
},
},
}, model.ChangeFeedID{})
require.NoError(t, err)
r.spans.ReplaceOrInsert(span2, rs)

redoMetaManager := &mockRedoMetaManager{enable: true, resolvedTs: 25}

// some table not exist yet with redo is enabled.
currentTables := &TableRanges{}
currentTables.UpdateTables([]model.TableID{1, 2, 3})
barrier := schedulepb.NewBarrierWithMinTs(30)
checkpoint, resolved := r.AdvanceCheckpoint(currentTables, time.Now(), barrier, redoMetaManager)
require.Equal(t, checkpointCannotProceed, checkpoint)
require.Equal(t, checkpointCannotProceed, resolved)
require.Equal(t, uint64(25), barrier.Barrier.GetGlobalBarrierTs())
}

func TestReplicationManagerHandleCaptureChanges(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit f5ecaf9

Please sign in to comment.