From f35b76a1fe24ab354cdbe869d37951d59ab14695 Mon Sep 17 00:00:00 2001 From: qupeng Date: Thu, 23 Nov 2023 16:11:42 +0800 Subject: [PATCH] sink(cdc): always handle sink failures for cases with sync-point enabled (#10132) close pingcap/tiflow#10091 --- cdc/processor/sinkmanager/manager.go | 45 +++++++++++++++++-- cdc/processor/sinkmanager/manager_test.go | 42 +++++++++++++++++ .../sinkmanager/table_sink_worker.go | 19 ++------ .../sinkmanager/table_sink_wrapper.go | 14 ++++++ .../dmlsink/blackhole/black_hole_dml_sink.go | 18 +++++--- cdc/sink/tablesink/table_sink.go | 2 + cdc/sink/tablesink/table_sink_impl.go | 8 ++++ 7 files changed, 121 insertions(+), 27 deletions(-) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 74c5513888c..a2ae9c22ed8 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -531,13 +531,50 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error { slowestTableProgress := progs[i] lowerBound := slowestTableProgress.nextLowerBoundPos upperBound := m.getUpperBound(tableSink.getUpperBoundTs()) - // The table has no available progress. - if lowerBound.Compare(upperBound) >= 0 { + + if !tableSink.initTableSink() { + // The table hasn't been attached to a sink. m.sinkProgressHeap.push(slowestTableProgress) continue } - // The table hasn't been attached to a sink. - if !tableSink.initTableSink() { + + if sinkErr := tableSink.checkTableSinkHealth(); sinkErr != nil { + switch errors.Cause(sinkErr).(type) { + case tablesink.SinkInternalError: + tableSink.closeAndClearTableSink() + if restartErr := tableSink.restart(ctx); restartErr == nil { + // Restart the table sink based on the checkpoint position. + ckpt := tableSink.getCheckpointTs().ResolvedMark() + lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt} + p := &progress{ + span: tableSink.span, + nextLowerBoundPos: lastWrittenPos.Next(), + version: slowestTableProgress.version, + } + m.sinkProgressHeap.push(p) + log.Info("table sink has been restarted", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Stringer("span", &tableSink.span), + zap.Any("lastWrittenPos", lastWrittenPos), + zap.String("sinkError", sinkErr.Error())) + } else { + m.sinkProgressHeap.push(slowestTableProgress) + log.Warn("table sink restart fail", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Stringer("span", &tableSink.span), + zap.String("sinkError", sinkErr.Error()), + zap.Error(restartErr)) + } + default: + return sinkErr + } + continue + } + + // The table has no available progress. + if lowerBound.Compare(upperBound) >= 0 { m.sinkProgressHeap.push(slowestTableProgress) continue } diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index b70e17fc3ab..b1447fe19a9 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -372,3 +372,45 @@ func TestSinkManagerNeedsStuckCheck(t *testing.T) { require.False(t, manager.needsStuckCheck()) } + +func TestSinkManagerRestartTableSinks(t *testing.T) { + failpoint.Enable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskHandlePause", "return") + defer failpoint.Disable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskHandlePause") + + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error, 16) + changefeedInfo := getChangefeedInfo() + manager, _, _ := CreateManagerWithMemEngine(t, ctx, model.ChangeFeedID{}, changefeedInfo, errCh) + defer func() { + cancel() + manager.Close() + }() + + span := tablepb.Span{TableID: 1} + manager.AddTable(span, 1, 100) + require.Nil(t, manager.StartTable(span, 2)) + table, exists := manager.tableSinks.Load(span) + require.True(t, exists) + + table.(*tableSinkWrapper).updateReceivedSorterResolvedTs(4) + table.(*tableSinkWrapper).updateBarrierTs(4) + select { + case task := <-manager.sinkTaskChan: + require.Equal(t, engine.Position{StartTs: 0, CommitTs: 3}, task.lowerBound) + task.callback(engine.Position{StartTs: 3, CommitTs: 4}) + case <-time.After(2 * time.Second): + panic("should always get a sink task") + } + + // With the failpoint blackhole/WriteEventsFail enabled, sink manager should restarts + // the table sink at its checkpoint. + failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/dmlsink/blackhole/WriteEventsFail", "1*return") + defer failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/dmlsink/blackhole/WriteEventsFail") + select { + case task := <-manager.sinkTaskChan: + require.Equal(t, engine.Position{StartTs: 2, CommitTs: 2}, task.lowerBound) + task.callback(engine.Position{StartTs: 3, CommitTs: 4}) + case <-time.After(2 * time.Second): + panic("should always get a sink task") + } +} diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index b04ce8f7c40..42c493f3918 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -96,6 +96,7 @@ func newSinkWorker( } func (w *sinkWorker) handleTasks(ctx context.Context, taskChan <-chan *sinkTask) error { + failpoint.Inject("SinkWorkerTaskHandlePause", func() { <-ctx.Done() }) for { select { case <-ctx.Done(): @@ -169,25 +170,11 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e // events have been reported. Then we can continue the table // at the checkpoint position. case tablesink.SinkInternalError: - task.tableSink.closeAndClearTableSink() // After the table sink is cleared all pending events are sent out or dropped. // So we can re-add the table into sinkMemQuota. w.sinkMemQuota.ClearTable(task.tableSink.span) - - // Restart the table sink based on the checkpoint position. - if err := task.tableSink.restart(ctx); err == nil { - checkpointTs := task.tableSink.getCheckpointTs() - ckpt := checkpointTs.ResolvedMark() - lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt} - performCallback(lastWrittenPos) - log.Info("table sink has been restarted", - zap.String("namespace", w.changefeedID.Namespace), - zap.String("changefeed", w.changefeedID.ID), - zap.Stringer("span", &task.span), - zap.Any("lastWrittenPos", lastWrittenPos), - zap.String("sinkError", finalErr.Error())) - finalErr = err - } + performCallback(advancer.lastPos) + finalErr = nil default: } } diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index 8d21d69709b..45ce8c55e19 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -166,6 +166,11 @@ func (t *tableSinkWrapper) start(ctx context.Context, startTs model.Ts) (err err break } } + if model.NewResolvedTs(startTs).Greater(t.tableSink.checkpointTs) { + t.tableSink.checkpointTs = model.NewResolvedTs(startTs) + t.tableSink.resolvedTs = model.NewResolvedTs(startTs) + t.tableSink.advanced = time.Now() + } t.state.Store(tablepb.TableStateReplicating) return nil } @@ -363,6 +368,15 @@ func (t *tableSinkWrapper) doTableSinkClear() { t.tableSink.version = 0 } +func (t *tableSinkWrapper) checkTableSinkHealth() (err error) { + t.tableSink.RLock() + defer t.tableSink.RUnlock() + if t.tableSink.s != nil { + err = t.tableSink.s.CheckHealth() + } + return +} + // When the attached sink fail, there can be some events that have already been // committed at downstream but we don't know. So we need to update `replicateTs` // of the table so that we can re-send those events later. diff --git a/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go b/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go index 6cc1d1f35dd..be470b6d357 100644 --- a/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go +++ b/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go @@ -14,6 +14,8 @@ package blackhole import ( + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/dmlsink" @@ -33,14 +35,16 @@ func NewDMLSink() *DMLSink { } // WriteEvents log the events. -func (s *DMLSink) WriteEvents(rows ...*dmlsink.CallbackableEvent[*model.RowChangedEvent]) error { - for _, row := range rows { - // NOTE: don't change the log, some tests depend on it. - log.Debug("BlackHoleSink: WriteEvents", zap.Any("row", row.Event)) - row.Callback() +func (s *DMLSink) WriteEvents(rows ...*dmlsink.CallbackableEvent[*model.RowChangedEvent]) (err error) { + failpoint.Inject("WriteEventsFail", func() { err = errors.New("InjectedErrorForWriteEventsFail") }) + if err == nil { + for _, row := range rows { + // NOTE: don't change the log, some tests depend on it. + log.Debug("BlackHoleSink: WriteEvents", zap.Any("row", row.Event)) + row.Callback() + } } - - return nil + return } // Scheme return the scheme of the sink. diff --git a/cdc/sink/tablesink/table_sink.go b/cdc/sink/tablesink/table_sink.go index 588b30aabf9..69f6f61db11 100644 --- a/cdc/sink/tablesink/table_sink.go +++ b/cdc/sink/tablesink/table_sink.go @@ -37,6 +37,8 @@ type TableSink interface { Close() // AsyncClose closes the table sink asynchronously. Returns true if it's closed. AsyncClose() bool + // CheckHealth checks whether the associated sink backend is healthy or not. + CheckHealth() error } // SinkInternalError means the error comes from sink internal. diff --git a/cdc/sink/tablesink/table_sink_impl.go b/cdc/sink/tablesink/table_sink_impl.go index da067501727..5ee1b869dba 100644 --- a/cdc/sink/tablesink/table_sink_impl.go +++ b/cdc/sink/tablesink/table_sink_impl.go @@ -158,6 +158,14 @@ func (e *EventTableSink[E, P]) AsyncClose() bool { return false } +// CheckHealth checks whether the associated sink backend is healthy or not. +func (e *EventTableSink[E, P]) CheckHealth() error { + if err := e.backendSink.WriteEvents(); err != nil { + return SinkInternalError{err} + } + return nil +} + func (e *EventTableSink[E, P]) freeze() { // Notice: We have to set the state to stopping first, // otherwise the progressTracker may be advanced incorrectly.