diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 011e09f60f8..74c5513888c 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -768,30 +768,13 @@ func (m *SinkManager) UpdateReceivedSorterResolvedTs(span tablepb.Span, ts model } // UpdateBarrierTs update all tableSink's barrierTs in the SinkManager -func (m *SinkManager) UpdateBarrierTs( - globalBarrierTs model.Ts, - tableBarrier map[model.TableID]model.Ts, -) { +func (m *SinkManager) UpdateBarrierTs(globalBarrierTs model.Ts, tableBarrier map[model.TableID]model.Ts) { m.tableSinks.Range(func(span tablepb.Span, value interface{}) bool { - tableSink := value.(*tableSinkWrapper) - lastBarrierTs := tableSink.barrierTs.Load() - // It is safe to do not use compare and swap here. - // Only the processor will update the barrier ts. - // Other goroutines will only read the barrier ts. - // So it is safe to do not use compare and swap here, just Load and Store. - if tableBarrierTs, ok := tableBarrier[tableSink.span.TableID]; ok { - barrierTs := tableBarrierTs - if barrierTs > globalBarrierTs { - barrierTs = globalBarrierTs - } - if barrierTs > lastBarrierTs { - tableSink.barrierTs.Store(barrierTs) - } - } else { - if globalBarrierTs > lastBarrierTs { - tableSink.barrierTs.Store(globalBarrierTs) - } + barrierTs := globalBarrierTs + if tableBarrierTs, ok := tableBarrier[span.TableID]; ok && tableBarrierTs < globalBarrierTs { + barrierTs = tableBarrierTs } + value.(*tableSinkWrapper).updateBarrierTs(barrierTs) return true }) } @@ -1008,14 +991,14 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats { resolvedTs = tableSink.getReceivedSorterResolvedTs() } - if resolvedTs < checkpointTs.ResolvedMark() { - log.Error("sinkManager: resolved ts should not less than checkpoint ts", + sinkUpperBound := tableSink.getUpperBoundTs() + if sinkUpperBound < checkpointTs.ResolvedMark() { + log.Panic("sinkManager: sink upperbound should not less than checkpoint ts", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), zap.Stringer("span", &span), - zap.Uint64("resolvedTs", resolvedTs), - zap.Any("checkpointTs", checkpointTs), - zap.Uint64("barrierTs", tableSink.barrierTs.Load())) + zap.Uint64("upperbound", sinkUpperBound), + zap.Any("checkpointTs", checkpointTs)) } return TableStats{ CheckpointTs: checkpointTs.ResolvedMark(), diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index 8df10c51daa..2495a3b115d 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -179,6 +179,15 @@ func (t *tableSinkWrapper) appendRowChangedEvents(events ...*model.RowChangedEve return nil } +func (t *tableSinkWrapper) updateBarrierTs(ts model.Ts) { + for { + old := t.barrierTs.Load() + if ts <= old || t.barrierTs.CompareAndSwap(old, ts) { + break + } + } +} + func (t *tableSinkWrapper) updateReceivedSorterResolvedTs(ts model.Ts) { for { old := t.receivedSorterResolvedTs.Load() diff --git a/cdc/scheduler/internal/v3/agent/agent.go b/cdc/scheduler/internal/v3/agent/agent.go index e4dad8bb903..45390b9d244 100644 --- a/cdc/scheduler/internal/v3/agent/agent.go +++ b/cdc/scheduler/internal/v3/agent/agent.go @@ -206,10 +206,7 @@ func (a *agent) Tick(ctx context.Context) (*schedulepb.Barrier, error) { return nil, errors.Trace(err) } - outboundMessages, barrier, err := a.handleMessage(inboundMessages) - if err != nil { - return nil, errors.Trace(err) - } + outboundMessages, barrier := a.handleMessage(inboundMessages) responses, err := a.tableM.poll(ctx) if err != nil { @@ -237,9 +234,7 @@ func (a *agent) handleLivenessUpdate(liveness model.Liveness) { } } -func (a *agent) handleMessage(msg []*schedulepb.Message) ( - result []*schedulepb.Message, barrier *schedulepb.Barrier, err error, -) { +func (a *agent) handleMessage(msg []*schedulepb.Message) (result []*schedulepb.Message, barrier *schedulepb.Barrier) { for _, message := range msg { ownerCaptureID := message.GetFrom() header := message.GetHeader() @@ -254,10 +249,7 @@ func (a *agent) handleMessage(msg []*schedulepb.Message) ( switch message.GetMsgType() { case schedulepb.MsgHeartbeat: var reMsg *schedulepb.Message - reMsg, barrier, err = a.handleMessageHeartbeat(message.GetHeartbeat()) - if err != nil { - return - } + reMsg, barrier = a.handleMessageHeartbeat(message.GetHeartbeat()) result = append(result, reMsg) case schedulepb.MsgDispatchTableRequest: a.handleMessageDispatchTableRequest(message.DispatchTableRequest, processorEpoch) @@ -272,28 +264,24 @@ func (a *agent) handleMessage(msg []*schedulepb.Message) ( return } -func (a *agent) handleMessageHeartbeat(request *schedulepb.Heartbeat) ( - *schedulepb.Message, *schedulepb.Barrier, error, -) { +func (a *agent) handleMessageHeartbeat(request *schedulepb.Heartbeat) (*schedulepb.Message, *schedulepb.Barrier) { allTables := a.tableM.getAllTableSpans() result := make([]tablepb.TableStatus, 0, allTables.Len()) - isValidCheckpointTs := true allTables.Ascend(func(span tablepb.Span, table *tableSpan) bool { status := table.getTableSpanStatus(request.CollectStats) - isValidCheckpointTs = status.Checkpoint.CheckpointTs <= status.Checkpoint.ResolvedTs + if status.Checkpoint.CheckpointTs > status.Checkpoint.ResolvedTs { + log.Warn("schedulerv3: CheckpointTs is greater than ResolvedTs", + zap.String("namespace", a.ChangeFeedID.Namespace), + zap.String("changefeed", a.ChangeFeedID.ID), + zap.String("span", span.String())) + } if table.task != nil && table.task.IsRemove { status.State = tablepb.TableStateStopping } result = append(result, status) - return isValidCheckpointTs + return true }) - if !isValidCheckpointTs { - status := result[len(result)-1] - checkpointTs := status.Checkpoint.CheckpointTs - resolvedTs := status.Checkpoint.ResolvedTs - return nil, nil, errors.ErrInvalidCheckpointTs.GenWithStackByArgs(checkpointTs, resolvedTs) - } for _, span := range request.GetSpans() { if _, ok := allTables.Get(span); !ok { status := a.tableM.getTableSpanStatus(span, request.CollectStats) @@ -320,7 +308,7 @@ func (a *agent) handleMessageHeartbeat(request *schedulepb.Heartbeat) ( zap.String("changefeed", a.ChangeFeedID.ID), zap.Any("message", message)) - return message, request.GetBarrier(), nil + return message, request.GetBarrier() } type dispatchTableTaskStatus int32 diff --git a/cdc/scheduler/internal/v3/agent/agent_test.go b/cdc/scheduler/internal/v3/agent/agent_test.go index bc08d491f0c..1b9617b3d4d 100644 --- a/cdc/scheduler/internal/v3/agent/agent_test.go +++ b/cdc/scheduler/internal/v3/agent/agent_test.go @@ -356,7 +356,7 @@ func TestAgentHandleMessageHeartbeat(t *testing.T) { }, } - response, _, _ := a.handleMessage([]*schedulepb.Message{heartbeat}) + response, _ := a.handleMessage([]*schedulepb.Message{heartbeat}) require.Len(t, response, 1) require.Equal(t, model.LivenessCaptureAlive, response[0].GetHeartbeatResponse().Liveness) @@ -376,7 +376,7 @@ func TestAgentHandleMessageHeartbeat(t *testing.T) { } a.tableM.tables.GetV(spanz.TableIDToComparableSpan(1)).task = &dispatchTableTask{IsRemove: true} - response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) + response, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) result = response[0].GetHeartbeatResponse().Tables sort.Slice(result, func(i, j int) bool { return result[i].Span.TableID < result[j].Span.TableID @@ -384,13 +384,13 @@ func TestAgentHandleMessageHeartbeat(t *testing.T) { require.Equal(t, tablepb.TableStateStopping, result[1].State) a.handleLivenessUpdate(model.LivenessCaptureStopping) - response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) + response, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) require.Len(t, response, 1) require.Equal(t, model.LivenessCaptureStopping, response[0].GetHeartbeatResponse().Liveness) a.handleLivenessUpdate(model.LivenessCaptureAlive) heartbeat.Heartbeat.IsStopping = true - response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) + response, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) require.Equal(t, model.LivenessCaptureStopping, response[0].GetHeartbeatResponse().Liveness) require.Equal(t, model.LivenessCaptureStopping, a.liveness.Load()) } @@ -577,7 +577,7 @@ func TestAgentHandleMessage(t *testing.T) { } // handle the first heartbeat, from the known owner. - response, _, _ := a.handleMessage([]*schedulepb.Message{heartbeat}) + response, _ := a.handleMessage([]*schedulepb.Message{heartbeat}) require.Len(t, response, 1) addTableRequest := &schedulepb.Message{ @@ -600,17 +600,17 @@ func TestAgentHandleMessage(t *testing.T) { }, } // wrong epoch, ignored - responses, _, _ := a.handleMessage([]*schedulepb.Message{addTableRequest}) + responses, _ := a.handleMessage([]*schedulepb.Message{addTableRequest}) require.False(t, tableM.tables.Has(spanz.TableIDToComparableSpan(1))) require.Len(t, responses, 0) // correct epoch, processing. addTableRequest.Header.ProcessorEpoch = a.Epoch - _, _, _ = a.handleMessage([]*schedulepb.Message{addTableRequest}) + _, _ = a.handleMessage([]*schedulepb.Message{addTableRequest}) require.True(t, a.tableM.tables.Has(spanz.TableIDToComparableSpan(1))) heartbeat.Header.OwnerRevision.Revision = 2 - response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) + response, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) require.Len(t, response, 1) // this should never happen in real world @@ -624,12 +624,12 @@ func TestAgentHandleMessage(t *testing.T) { From: a.ownerInfo.ID, } - response, _, _ = a.handleMessage([]*schedulepb.Message{unknownMessage}) + response, _ = a.handleMessage([]*schedulepb.Message{unknownMessage}) require.Len(t, response, 0) // staled message heartbeat.Header.OwnerRevision.Revision = 1 - response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) + response, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) require.Len(t, response, 0) } diff --git a/cdc/scheduler/internal/v3/replication/replication_manager.go b/cdc/scheduler/internal/v3/replication/replication_manager.go index fe98b3a41f2..9a33d460bbf 100644 --- a/cdc/scheduler/internal/v3/replication/replication_manager.go +++ b/cdc/scheduler/internal/v3/replication/replication_manager.go @@ -512,6 +512,38 @@ func (r *Manager) AdvanceCheckpoint( barrier *schedulepb.BarrierWithMinTs, redoMetaManager redo.MetaManager, ) (newCheckpointTs, newResolvedTs model.Ts) { + var redoFlushedResolvedTs model.Ts + limitBarrierWithRedo := func(newCheckpointTs, newResolvedTs uint64) (uint64, uint64) { + flushedMeta := redoMetaManager.GetFlushedMeta() + redoFlushedResolvedTs = flushedMeta.ResolvedTs + log.Debug("owner gets flushed redo meta", + zap.String("namespace", r.changefeedID.Namespace), + zap.String("changefeed", r.changefeedID.ID), + zap.Uint64("flushedCheckpointTs", flushedMeta.CheckpointTs), + zap.Uint64("flushedResolvedTs", flushedMeta.ResolvedTs)) + if flushedMeta.ResolvedTs < newResolvedTs { + newResolvedTs = flushedMeta.ResolvedTs + } + + if newCheckpointTs > newResolvedTs { + newCheckpointTs = newResolvedTs + } + + if barrier.GlobalBarrierTs > newResolvedTs { + barrier.GlobalBarrierTs = newResolvedTs + } + return newCheckpointTs, newResolvedTs + } + defer func() { + if redoFlushedResolvedTs != 0 && barrier.GlobalBarrierTs > redoFlushedResolvedTs { + log.Panic("barrierTs should never greater than redo flushed", + zap.String("namespace", r.changefeedID.Namespace), + zap.String("changefeed", r.changefeedID.ID), + zap.Uint64("barrierTs", barrier.GlobalBarrierTs), + zap.Uint64("redoFlushedResolvedTs", redoFlushedResolvedTs)) + } + }() + newCheckpointTs, newResolvedTs = math.MaxUint64, math.MaxUint64 slowestRange := tablepb.Span{} cannotProceed := false @@ -573,6 +605,11 @@ func (r *Manager) AdvanceCheckpoint( return true }) if cannotProceed { + if redoMetaManager.Enabled() { + // If redo is enabled, GlobalBarrierTs should be limited by redo flushed meta. + newResolvedTs = barrier.RedoBarrierTs + limitBarrierWithRedo(newCheckpointTs, newResolvedTs) + } return checkpointCannotProceed, checkpointCannotProceed } if slowestRange.TableID != 0 { @@ -619,26 +656,12 @@ func (r *Manager) AdvanceCheckpoint( newResolvedTs = barrier.RedoBarrierTs } redoMetaManager.UpdateMeta(newCheckpointTs, newResolvedTs) - flushedMeta := redoMetaManager.GetFlushedMeta() - flushedCheckpointTs, flushedResolvedTs := flushedMeta.CheckpointTs, flushedMeta.ResolvedTs - log.Debug("owner gets flushed meta", - zap.Uint64("flushedResolvedTs", flushedResolvedTs), - zap.Uint64("flushedCheckpointTs", flushedCheckpointTs), - zap.Uint64("newResolvedTs", newResolvedTs), - zap.Uint64("newCheckpointTs", newCheckpointTs), + log.Debug("owner updates redo meta", zap.String("namespace", r.changefeedID.Namespace), - zap.String("changefeed", r.changefeedID.ID)) - if flushedResolvedTs != 0 && flushedResolvedTs < newResolvedTs { - newResolvedTs = flushedResolvedTs - } - - if newCheckpointTs > newResolvedTs { - newCheckpointTs = newResolvedTs - } - - if barrier.GlobalBarrierTs > newResolvedTs { - barrier.GlobalBarrierTs = newResolvedTs - } + zap.String("changefeed", r.changefeedID.ID), + zap.Uint64("newCheckpointTs", newCheckpointTs), + zap.Uint64("newResolvedTs", newResolvedTs)) + return limitBarrierWithRedo(newCheckpointTs, newResolvedTs) } return newCheckpointTs, newResolvedTs diff --git a/cdc/scheduler/internal/v3/replication/replication_manager_test.go b/cdc/scheduler/internal/v3/replication/replication_manager_test.go index 692844659c2..0584e62972c 100644 --- a/cdc/scheduler/internal/v3/replication/replication_manager_test.go +++ b/cdc/scheduler/internal/v3/replication/replication_manager_test.go @@ -766,6 +766,51 @@ func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { require.Equal(t, model.Ts(9), barrier.GetGlobalBarrierTs()) } +func TestReplicationManagerAdvanceCheckpointWithRedoEnabled(t *testing.T) { + t.Parallel() + r := NewReplicationManager(1, model.ChangeFeedID{}) + span := spanz.TableIDToComparableSpan(1) + rs, err := NewReplicationSet(span, model.Ts(10), + map[model.CaptureID]*tablepb.TableStatus{ + "1": { + Span: spanz.TableIDToComparableSpan(1), + State: tablepb.TableStateReplicating, + Checkpoint: tablepb.Checkpoint{ + CheckpointTs: model.Ts(10), + ResolvedTs: model.Ts(20), + }, + }, + }, model.ChangeFeedID{}) + require.NoError(t, err) + r.spans.ReplaceOrInsert(span, rs) + + span2 := spanz.TableIDToComparableSpan(2) + rs, err = NewReplicationSet(span2, model.Ts(15), + map[model.CaptureID]*tablepb.TableStatus{ + "2": { + Span: spanz.TableIDToComparableSpan(2), + State: tablepb.TableStateReplicating, + Checkpoint: tablepb.Checkpoint{ + CheckpointTs: model.Ts(15), + ResolvedTs: model.Ts(30), + }, + }, + }, model.ChangeFeedID{}) + require.NoError(t, err) + r.spans.ReplaceOrInsert(span2, rs) + + redoMetaManager := &mockRedoMetaManager{enable: true, resolvedTs: 25} + + // some table not exist yet with redo is enabled. + currentTables := &TableRanges{} + currentTables.UpdateTables([]model.TableID{1, 2, 3}) + barrier := schedulepb.NewBarrierWithMinTs(30) + checkpoint, resolved := r.AdvanceCheckpoint(currentTables, time.Now(), barrier, redoMetaManager) + require.Equal(t, checkpointCannotProceed, checkpoint) + require.Equal(t, checkpointCannotProceed, resolved) + require.Equal(t, uint64(25), barrier.Barrier.GetGlobalBarrierTs()) +} + func TestReplicationManagerHandleCaptureChanges(t *testing.T) { t.Parallel()