Skip to content

Commit

Permalink
puller(ticdc): fix wrong update splitting behavior after table schedu…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jun 11, 2024
1 parent 2a28078 commit 3426e46
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 95 deletions.
18 changes: 18 additions & 0 deletions cdc/model/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package model

import (
"errors"
"fmt"

"github.com/pingcap/tiflow/pkg/regionspan"
Expand Down Expand Up @@ -110,3 +111,20 @@ func (v *RawKVEntry) String() string {
func (v *RawKVEntry) ApproximateDataSize() int64 {
return int64(len(v.Key) + len(v.Value) + len(v.OldValue))
}

// ShouldSplitKVEntry checks whether the raw kv entry should be splitted.
type ShouldSplitKVEntry func(raw *RawKVEntry) bool

// SplitUpdateKVEntry splits the raw kv entry into a delete entry and an insert entry.
func SplitUpdateKVEntry(raw *RawKVEntry) (*RawKVEntry, *RawKVEntry, error) {
if raw == nil {
return nil, nil, errors.New("nil event cannot be split")
}
deleteKVEntry := *raw
deleteKVEntry.Value = nil

insertKVEntry := *raw
insertKVEntry.OldValue = nil

return &deleteKVEntry, &insertKVEntry, nil
}
33 changes: 15 additions & 18 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,11 @@ func (p *processor) AddTable(
}

if p.pullBasedSinking {
p.sinkManager.AddTable(tableID, startTs, p.changefeed.Info.TargetTs)
table := p.sinkManager.AddTable(tableID, startTs, p.changefeed.Info.TargetTs)
if p.redoDMLMgr.Enabled() {
p.redoDMLMgr.AddTable(tableID, startTs)
}
p.sourceManager.AddTable(ctx.(cdcContext.Context), tableID, p.getTableName(ctx, tableID), startTs)
p.sourceManager.AddTable(ctx.(cdcContext.Context), tableID, p.getTableName(ctx, tableID), startTs, table.GetReplicaTs)
} else {
table, err := p.createTablePipeline(
ctx.(cdcContext.Context), tableID, &model.TableReplicaInfo{StartTs: startTs})
Expand All @@ -232,7 +232,6 @@ func (p *processor) AddTable(
}
p.tables[tableID] = table
}

return true, nil
}

Expand Down Expand Up @@ -570,18 +569,6 @@ func isProcessorIgnorableError(err error) bool {
return false
}

// needPullerSafeModeAtStart returns true if the scheme is mysql compatible.
// pullerSafeMode means to split all update kv entries whose commitTS
// is older then the start time of this changefeed.
func needPullerSafeModeAtStart(sinkURIStr string) (bool, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := sink.GetScheme(sinkURI)
return sink.IsMySQLCompatibleScheme(scheme), nil
}

// Tick implements the `orchestrator.State` interface
// the `state` parameter is sent by the etcd worker, the `state` must be a snapshot of KVs in etcd
// The main logic of processor is in this function, including the calculation of many kinds of ts,
Expand Down Expand Up @@ -757,6 +744,16 @@ func (p *processor) createTaskPosition() (skipThisTick bool) {
return true
}

// isMysqlCompatibleBackend returns true if the sinkURIStr is mysql compatible.
func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := sink.GetScheme(sinkURI)
return sink.IsMySQLCompatibleScheme(scheme), nil
}

// lazyInitImpl create Filter, SchemaStorage, Mounter instances at the first tick.
func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
if p.initialized {
Expand Down Expand Up @@ -847,16 +844,16 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
zap.Duration("duration", time.Since(start)))
return errors.Trace(err)
}
pullerSafeModeAtStart, err := needPullerSafeModeAtStart(p.changefeed.Info.SinkURI)
isMysqlBackend, err := isMysqlCompatibleBackend(p.changefeed.Info.SinkURI)
if err != nil {
return errors.Trace(err)
}
p.sourceManager = sourcemanager.New(p.changefeedID, p.upstream, p.mg,
sortEngine, p.errCh, p.changefeed.Info.Config.BDRMode, pullerSafeModeAtStart)
sortEngine, p.errCh, p.changefeed.Info.Config.BDRMode, isMysqlBackend)
p.sinkManager, err = sinkmanager.New(stdCtx, p.changefeedID,
p.changefeed.Info, p.upstream, p.schemaStorage,
p.redoDMLMgr, p.sourceManager,
p.errCh, p.warnCh, p.metricsTableSinkTotalRows, p.metricsTableSinkFlushLagDuration)
p.errCh, p.warnCh, isMysqlBackend, p.metricsTableSinkTotalRows, p.metricsTableSinkFlushLagDuration)
if err != nil {
log.Info("Processor creates sink manager fail",
zap.String("namespace", p.changefeedID.Namespace),
Expand Down
14 changes: 12 additions & 2 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ type SinkManager struct {
// wg is used to wait for all workers to exit.
wg sync.WaitGroup

// isMysqlBackend indicates whether the backend is MySQL compatible.
isMysqlBackend bool

// Metric for table sink.
metricsTableSinkTotalRows prometheus.Counter

Expand All @@ -145,6 +148,7 @@ func New(
sourceManager *sourcemanager.SourceManager,
errChan chan error,
warnChan chan error,
isMysqlBackend bool,
metricsTableSinkTotalRows prometheus.Counter,
metricsTableSinkFlushLagDuration prometheus.Observer,
) (*SinkManager, error) {
Expand All @@ -160,6 +164,7 @@ func New(
sinkTaskChan: make(chan *sinkTask),
sinkWorkerAvailable: make(chan struct{}, 1),
sinkRetry: retry.NewInfiniteErrorRetry(),
isMysqlBackend: isMysqlBackend,

metricsTableSinkTotalRows: metricsTableSinkTotalRows,

Expand Down Expand Up @@ -312,6 +317,11 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er
if cerror.IsDupEntryError(err) {
return errors.Trace(err)
}

if m.isMysqlBackend {
// For MySQL backend, we should restart sink. Let owner to handle the error.
return errors.Trace(err)
}
}

// If the error is retryable, we should retry to re-establish the internal resources.
Expand Down Expand Up @@ -848,7 +858,7 @@ func (m *SinkManager) UpdateBarrierTs(globalBarrierTs model.Ts, tableBarrier map
}

// AddTable adds a table(TableSink) to the sink manager.
func (m *SinkManager) AddTable(tableID model.TableID, startTs model.Ts, targetTs model.Ts) {
func (m *SinkManager) AddTable(tableID model.TableID, startTs model.Ts, targetTs model.Ts) *tableSinkWrapper {
sinkWrapper := newTableSinkWrapper(
m.changefeedID,
tableID,
Expand Down Expand Up @@ -876,7 +886,6 @@ func (m *SinkManager) AddTable(tableID model.TableID, startTs model.Ts, targetTs
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Int64("tableID", tableID))
return
}
m.sinkMemQuota.AddTable(tableID)
m.redoMemQuota.AddTable(tableID)
Expand All @@ -886,6 +895,7 @@ func (m *SinkManager) AddTable(tableID model.TableID, startTs model.Ts, targetTs
zap.Int64("tableID", tableID),
zap.Uint64("startTs", startTs),
zap.Uint64("version", sinkWrapper.version))
return sinkWrapper
}

// StartTable sets the table(TableSink) state to replicating.
Expand Down
3 changes: 2 additions & 1 deletion cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func createManagerWithMemEngine(
&entry.MockSchemaStorage{Resolved: math.MaxUint64},
nil, sm,
errChan, errChan,
false,
prometheus.NewCounter(prometheus.CounterOpts{}),
prometheus.NewHistogram(prometheus.HistogramOpts{}))
require.NoError(t, err)
Expand Down Expand Up @@ -166,7 +167,7 @@ func TestAddTable(t *testing.T) {
require.Equal(t, 0, manager.sinkProgressHeap.len(), "Not started table shout not in progress heap")
err := manager.StartTable(tableID, 1)
require.NoError(t, err)
require.Equal(t, uint64(0x7ffffffffffbffff), tableSink.(*tableSinkWrapper).replicateTs)
require.Equal(t, uint64(0x7ffffffffffbffff), tableSink.(*tableSinkWrapper).replicateTs.Load())

progress := manager.sinkProgressHeap.pop()
require.Equal(t, tableID, progress.tableID)
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/sinkmanager/redo_log_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e
var x []*model.RowChangedEvent
var size uint64
if e.Row != nil {
// For all rows, we add table replicate ts, so mysql sink can determine safe-mode.
e.Row.ReplicatingTs = task.tableSink.replicateTs
// For all events, we add table replicate ts, so mysql sink can determine safe-mode.
e.Row.ReplicatingTs = task.tableSink.replicateTs.Load()
x, size = handleRowChangedEvents(w.changefeedID, task.tableID, e)
usedMemSize += size
rows = append(rows, x...)
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
// NOTICE: The event can be filtered by the event filter.
if e.Row != nil {
// For all rows, we add table replicate ts, so mysql sink can determine safe-mode.
e.Row.ReplicatingTs = task.tableSink.replicateTs
e.Row.ReplicatingTs = task.tableSink.GetReplicaTs()
x, size := handleRowChangedEvents(w.changefeedID, task.tableID, e)
events = append(events, x...)
allEventSize += size
Expand Down
25 changes: 18 additions & 7 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package sinkmanager

import (
"context"
"math"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -76,7 +77,7 @@ type tableSinkWrapper struct {
receivedSorterResolvedTs atomic.Uint64

// replicateTs is the ts that the table sink has started to replicate.
replicateTs model.Ts
replicateTs atomic.Uint64
genReplicateTs func(ctx context.Context) (model.Ts, error)

// lastCleanTime indicates the last time the table has been cleaned.
Expand All @@ -89,6 +90,11 @@ type tableSinkWrapper struct {
rangeEventCountsMu sync.Mutex
}

// GetReplicaTs returns the replicate ts of the table sink.
func (t *tableSinkWrapper) GetReplicaTs() model.Ts {
return t.replicateTs.Load()
}

type rangeEventCount struct {
// firstPos and lastPos are used to merge many rangeEventCount into one.
firstPos engine.Position
Expand Down Expand Up @@ -131,31 +137,34 @@ func newTableSinkWrapper(

res.receivedSorterResolvedTs.Store(startTs)
res.barrierTs.Store(startTs)
res.replicateTs.Store(math.MaxUint64)
return res
}

func (t *tableSinkWrapper) start(ctx context.Context, startTs model.Ts) (err error) {
if t.replicateTs != 0 {
if t.replicateTs.Load() != math.MaxUint64 {
log.Panic("The table sink has already started",
zap.String("namespace", t.changefeed.Namespace),
zap.String("changefeed", t.changefeed.ID),
zap.Int64("tableID", t.tableID),
zap.Uint64("startTs", startTs),
zap.Uint64("oldReplicateTs", t.replicateTs),
zap.Uint64("oldReplicateTs", t.replicateTs.Load()),
)
}

// FIXME(qupeng): it can be re-fetched later instead of fails.
if t.replicateTs, err = t.genReplicateTs(ctx); err != nil {
ts, err := t.genReplicateTs(ctx)
if err != nil {
return errors.Trace(err)
}
t.replicateTs.Store(ts)

log.Info("Sink is started",
zap.String("namespace", t.changefeed.Namespace),
zap.String("changefeed", t.changefeed.ID),
zap.Int64("tableID", t.tableID),
zap.Uint64("startTs", startTs),
zap.Uint64("replicateTs", t.replicateTs),
zap.Uint64("replicateTs", ts),
)

// This start ts maybe greater than the initial start ts of the table sink.
Expand Down Expand Up @@ -378,14 +387,16 @@ func (t *tableSinkWrapper) checkTableSinkHealth() (err error) {
// committed at downstream but we don't know. So we need to update `replicateTs`
// of the table so that we can re-send those events later.
func (t *tableSinkWrapper) restart(ctx context.Context) (err error) {
if t.replicateTs, err = t.genReplicateTs(ctx); err != nil {
ts, err := t.genReplicateTs(ctx)
if err != nil {
return errors.Trace(err)
}
t.replicateTs.Store(ts)
log.Info("Sink is restarted",
zap.String("namespace", t.changefeed.Namespace),
zap.String("changefeed", t.changefeed.ID),
zap.Int64("tableID", t.tableID),
zap.Uint64("replicateTs", t.replicateTs))
zap.Uint64("replicateTs", ts))
return nil
}

Expand Down
Loading

0 comments on commit 3426e46

Please sign in to comment.