Skip to content

Commit

Permalink
sink(cdc): clean backends if table sink is stuck too long (#9527)
Browse files Browse the repository at this point in the history
close #9542
  • Loading branch information
hicqu authored Aug 11, 2023
1 parent 4738442 commit e99ba1a
Show file tree
Hide file tree
Showing 23 changed files with 343 additions and 114 deletions.
7 changes: 7 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,9 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
if c.Sink.TxnAtomicity != nil {
res.Sink.TxnAtomicity = util.AddressOf(config.AtomicityLevel(*c.Sink.TxnAtomicity))
}
if c.Sink.AdvanceTimeoutInSec != nil {
res.Sink.AdvanceTimeoutInSec = util.AddressOf(*c.Sink.AdvanceTimeoutInSec)
}

}
if c.Mounter != nil {
Expand Down Expand Up @@ -637,6 +640,9 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
if cloned.Sink.TxnAtomicity != nil {
res.Sink.TxnAtomicity = util.AddressOf(string(*cloned.Sink.TxnAtomicity))
}
if cloned.Sink.AdvanceTimeoutInSec != nil {
res.Sink.AdvanceTimeoutInSec = util.AddressOf(*cloned.Sink.AdvanceTimeoutInSec)
}
}
if cloned.Consistent != nil {
res.Consistent = &ConsistentConfig{
Expand Down Expand Up @@ -788,6 +794,7 @@ type SinkConfig struct {
KafkaConfig *KafkaConfig `json:"kafka_config,omitempty"`
MySQLConfig *MySQLConfig `json:"mysql_config,omitempty"`
CloudStorageConfig *CloudStorageConfig `json:"cloud_storage_config,omitempty"`
AdvanceTimeoutInSec *uint `json:"advance_timeout,omitempty"`
}

// CSVConfig denotes the csv config
Expand Down
1 change: 1 addition & 0 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ var defaultAPIConfig = &ReplicaConfig{
EnableKafkaSinkV2: util.AddressOf(false),
OnlyOutputUpdatedColumns: util.AddressOf(false),
DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false),
AdvanceTimeoutInSec: util.AddressOf(uint(150)),
},
Consistent: &ConsistentConfig{
Level: "none",
Expand Down
3 changes: 2 additions & 1 deletion cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ func initProcessor4Test(
},
"sink": {
"dispatchers": null,
"protocol": "open-protocol"
"protocol": "open-protocol",
"advance-timeout-in-sec": 150
}
},
"state": "normal",
Expand Down
142 changes: 100 additions & 42 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ const (
// engine.CleanByTable can be expensive. So it's necessary to reduce useless calls.
cleanTableInterval = 5 * time.Second
cleanTableMinEvents = 128
maxRetryDuration = 30 * time.Minute
errGCInterval = 10 * time.Minute
)

// TableStats of a table sink.
Expand Down Expand Up @@ -86,8 +84,15 @@ type SinkManager struct {
sourceManager *sourcemanager.SourceManager

// sinkFactory used to create table sink.
sinkFactory *factory.SinkFactory
sinkFactoryMu sync.Mutex
sinkFactory struct {
sync.Mutex
f *factory.SinkFactory
// When every time we want to create a new factory, version will be increased and
// errors will be replaced by a new channel. version is used to distinct different
// sink factories in table sinks.
version uint64
errors chan error
}

// tableSinks is a map from tableID to tableSink.
tableSinks spanz.SyncMap
Expand Down Expand Up @@ -190,7 +195,6 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
enableOldValue := m.changefeedInfo.Config.EnableOldValue

gcErrors := make(chan error, 16)
sinkFactoryErrors := make(chan error, 16)
sinkErrors := make(chan error, 16)
redoErrors := make(chan error, 16)

Expand Down Expand Up @@ -244,12 +248,7 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er

// SinkManager will restart some internal modules if necessasry.
for {
if err := m.initSinkFactory(sinkFactoryErrors); err != nil {
select {
case <-m.managerCtx.Done():
case sinkFactoryErrors <- err:
}
}
sinkFactoryErrors, sinkFactoryVersion := m.initSinkFactory()

select {
case <-m.managerCtx.Done():
Expand All @@ -264,9 +263,9 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
log.Warn("Sink manager backend sink fails",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Uint64("factoryVersion", sinkFactoryVersion),
zap.Error(err))
m.clearSinkFactory()
sinkFactoryErrors = make(chan error, 16)

start := time.Now()
log.Info("Sink manager is closing all table sinks",
Expand Down Expand Up @@ -304,48 +303,91 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
}
}

func (m *SinkManager) initSinkFactory(errCh chan error) error {
m.sinkFactoryMu.Lock()
defer m.sinkFactoryMu.Unlock()
if m.sinkFactory != nil {
return nil
}
func (m *SinkManager) initSinkFactory() (chan error, uint64) {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
uri := m.changefeedInfo.SinkURI
cfg := m.changefeedInfo.Config

if m.sinkFactory.f != nil {
return m.sinkFactory.errors, m.sinkFactory.version
}
if m.sinkFactory.errors == nil {
m.sinkFactory.errors = make(chan error, 16)
m.sinkFactory.version += 1
}

emitError := func(err error) {
select {
case <-m.managerCtx.Done():
case m.sinkFactory.errors <- err:
}
}

var err error = nil
failpoint.Inject("SinkManagerRunError", func() {
log.Info("failpoint SinkManagerRunError injected", zap.String("changefeed", m.changefeedID.ID))
err = errors.New("SinkManagerRunError")
})
if err != nil {
return errors.Trace(err)
emitError(err)
return m.sinkFactory.errors, m.sinkFactory.version
}

if m.sinkFactory, err = factory.New(m.managerCtx, m.changefeedID, uri, cfg, errCh); err == nil {
log.Info("Sink manager inits sink factory success",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID))
return nil
m.sinkFactory.f, err = factory.New(m.managerCtx, m.changefeedID, uri, cfg, m.sinkFactory.errors)
if err != nil {
emitError(err)
return m.sinkFactory.errors, m.sinkFactory.version
}
return errors.Trace(err)

log.Info("Sink manager inits sink factory success",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Uint64("factoryVersion", m.sinkFactory.version))
return m.sinkFactory.errors, m.sinkFactory.version
}

func (m *SinkManager) clearSinkFactory() {
m.sinkFactoryMu.Lock()
defer m.sinkFactoryMu.Unlock()
if m.sinkFactory != nil {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
if m.sinkFactory.f != nil {
log.Info("Sink manager closing sink factory",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID))
m.sinkFactory.Close()
m.sinkFactory = nil
zap.String("changefeed", m.changefeedID.ID),
zap.Uint64("factoryVersion", m.sinkFactory.version))
m.sinkFactory.f.Close()
m.sinkFactory.f = nil
log.Info("Sink manager has closed sink factory",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID))
zap.String("changefeed", m.changefeedID.ID),
zap.Uint64("factoryVersion", m.sinkFactory.version))
}
if m.sinkFactory.errors != nil {
close(m.sinkFactory.errors)
for range m.sinkFactory.errors {
}
m.sinkFactory.errors = nil
}
}

func (m *SinkManager) putSinkFactoryError(err error, version uint64) {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
skipped := true
if version == m.sinkFactory.version {
select {
case m.sinkFactory.errors <- err:
skipped = false
default:
}
}
log.Info("Sink manager tries to put an sink error",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Bool("skipped", skipped),
zap.String("error", err.Error()))
}

func (m *SinkManager) startSinkWorkers(ctx context.Context, eg *errgroup.Group, splitTxn bool, enableOldValue bool) {
for i := 0; i < sinkWorkerNum; i++ {
w := newSinkWorker(m.changefeedID, m.sourceManager,
Expand Down Expand Up @@ -391,7 +433,7 @@ func (m *SinkManager) backgroundGC(errors chan<- error) {
if time.Since(sink.lastCleanTime) < cleanTableInterval {
return true
}
checkpointTs := sink.getCheckpointTs()
checkpointTs, _, _ := sink.getCheckpointTs()
resolvedMark := checkpointTs.ResolvedMark()
if resolvedMark == 0 {
return true
Expand Down Expand Up @@ -756,14 +798,15 @@ func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs mod
sinkWrapper := newTableSinkWrapper(
m.changefeedID,
span,
func() tablesink.TableSink {
if m.sinkFactoryMu.TryLock() {
defer m.sinkFactoryMu.Unlock()
if m.sinkFactory != nil {
return m.sinkFactory.CreateTableSink(m.changefeedID, span, startTs, m.metricsTableSinkTotalRows)
func() (s tablesink.TableSink, version uint64) {
if m.sinkFactory.TryLock() {
defer m.sinkFactory.Unlock()
if m.sinkFactory.f != nil {
s = m.sinkFactory.f.CreateTableSink(m.changefeedID, span, startTs, m.metricsTableSinkTotalRows)
version = m.sinkFactory.version
}
}
return nil
return
},
tablepb.TableStatePreparing,
startTs,
Expand Down Expand Up @@ -862,12 +905,12 @@ func (m *SinkManager) RemoveTable(span tablepb.Span) {
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span))
}
sink := value.(*tableSinkWrapper)
checkpointTs, _, _ := value.(*tableSinkWrapper).getCheckpointTs()
log.Info("Remove table sink successfully",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span),
zap.Uint64("checkpointTs", sink.getCheckpointTs().Ts))
zap.Uint64("checkpointTs", checkpointTs.Ts))
if m.eventCache != nil {
m.eventCache.removeTable(span)
}
Expand Down Expand Up @@ -931,9 +974,24 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats {
}
tableSink := value.(*tableSinkWrapper)

checkpointTs := tableSink.getCheckpointTs()
checkpointTs, version, advanced := tableSink.getCheckpointTs()
m.sinkMemQuota.Release(span, checkpointTs)
m.redoMemQuota.Release(span, checkpointTs)

stuckCheck := time.Duration(*m.changefeedInfo.Config.Sink.AdvanceTimeoutInSec) * time.Second
if version > 0 && time.Since(advanced) > stuckCheck &&
oracle.GetTimeFromTS(tableSink.getUpperBoundTs()).Sub(oracle.GetTimeFromTS(checkpointTs.Ts)) > stuckCheck {
log.Warn("Table checkpoint is stuck too long, will restart the sink backend",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span),
zap.Any("checkpointTs", checkpointTs),
zap.Float64("stuckCheck", stuckCheck.Seconds()),
zap.Uint64("factoryVersion", version))
tableSink.updateTableSinkAdvanced()
m.putSinkFactoryError(errors.New("table sink stuck"), version)
}

var resolvedTs model.Ts
// If redo log is enabled, we have to use redo log's resolved ts to calculate processor's min resolved ts.
if m.redoDMLMgr != nil {
Expand Down
7 changes: 4 additions & 3 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func TestGenerateTableSinkTaskWithBarrierTs(t *testing.T) {
require.Eventually(t, func() bool {
tableSink, ok := manager.tableSinks.Load(span)
require.True(t, ok)
checkpointTS := tableSink.(*tableSinkWrapper).getCheckpointTs()
checkpointTS, _, _ := tableSink.(*tableSinkWrapper).getCheckpointTs()
return checkpointTS.ResolvedMark() == 4
}, 5*time.Second, 10*time.Millisecond)
}
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestGenerateTableSinkTaskWithResolvedTs(t *testing.T) {
require.Eventually(t, func() bool {
tableSink, ok := manager.tableSinks.Load(span)
require.True(t, ok)
checkpointTS := tableSink.(*tableSinkWrapper).getCheckpointTs()
checkpointTS, _, _ := tableSink.(*tableSinkWrapper).getCheckpointTs()
return checkpointTS.ResolvedMark() == 3
}, 5*time.Second, 10*time.Millisecond)
}
Expand Down Expand Up @@ -283,7 +283,8 @@ func TestDoNotGenerateTableSinkTaskWhenTableIsNotReplicating(t *testing.T) {
tableSink, ok := manager.tableSinks.Load(span)
require.True(t, ok)
require.NotNil(t, tableSink)
require.Equal(t, uint64(1), tableSink.(*tableSinkWrapper).getCheckpointTs().Ts)
checkpointTS, _, _ := tableSink.(*tableSinkWrapper).getCheckpointTs()
require.Equal(t, uint64(1), checkpointTS.Ts)
}

func TestClose(t *testing.T) {
Expand Down
Loading

0 comments on commit e99ba1a

Please sign in to comment.