Skip to content

Commit

Permalink
sink(cdc): always handle sink failures for cases with sync-point enab…
Browse files Browse the repository at this point in the history
…led (#10132)

close #10091
  • Loading branch information
hicqu authored Nov 23, 2023
1 parent 9366da7 commit f35b76a
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 27 deletions.
45 changes: 41 additions & 4 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,13 +531,50 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
slowestTableProgress := progs[i]
lowerBound := slowestTableProgress.nextLowerBoundPos
upperBound := m.getUpperBound(tableSink.getUpperBoundTs())
// The table has no available progress.
if lowerBound.Compare(upperBound) >= 0 {

if !tableSink.initTableSink() {
// The table hasn't been attached to a sink.
m.sinkProgressHeap.push(slowestTableProgress)
continue
}
// The table hasn't been attached to a sink.
if !tableSink.initTableSink() {

if sinkErr := tableSink.checkTableSinkHealth(); sinkErr != nil {
switch errors.Cause(sinkErr).(type) {
case tablesink.SinkInternalError:
tableSink.closeAndClearTableSink()
if restartErr := tableSink.restart(ctx); restartErr == nil {
// Restart the table sink based on the checkpoint position.
ckpt := tableSink.getCheckpointTs().ResolvedMark()
lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt}
p := &progress{
span: tableSink.span,
nextLowerBoundPos: lastWrittenPos.Next(),
version: slowestTableProgress.version,
}
m.sinkProgressHeap.push(p)
log.Info("table sink has been restarted",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &tableSink.span),
zap.Any("lastWrittenPos", lastWrittenPos),
zap.String("sinkError", sinkErr.Error()))
} else {
m.sinkProgressHeap.push(slowestTableProgress)
log.Warn("table sink restart fail",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &tableSink.span),
zap.String("sinkError", sinkErr.Error()),
zap.Error(restartErr))
}
default:
return sinkErr
}
continue
}

// The table has no available progress.
if lowerBound.Compare(upperBound) >= 0 {
m.sinkProgressHeap.push(slowestTableProgress)
continue
}
Expand Down
42 changes: 42 additions & 0 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,3 +372,45 @@ func TestSinkManagerNeedsStuckCheck(t *testing.T) {

require.False(t, manager.needsStuckCheck())
}

func TestSinkManagerRestartTableSinks(t *testing.T) {
failpoint.Enable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskHandlePause", "return")
defer failpoint.Disable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskHandlePause")

ctx, cancel := context.WithCancel(context.Background())
errCh := make(chan error, 16)
changefeedInfo := getChangefeedInfo()
manager, _, _ := CreateManagerWithMemEngine(t, ctx, model.ChangeFeedID{}, changefeedInfo, errCh)
defer func() {
cancel()
manager.Close()
}()

span := tablepb.Span{TableID: 1}
manager.AddTable(span, 1, 100)
require.Nil(t, manager.StartTable(span, 2))
table, exists := manager.tableSinks.Load(span)
require.True(t, exists)

table.(*tableSinkWrapper).updateReceivedSorterResolvedTs(4)
table.(*tableSinkWrapper).updateBarrierTs(4)
select {
case task := <-manager.sinkTaskChan:
require.Equal(t, engine.Position{StartTs: 0, CommitTs: 3}, task.lowerBound)
task.callback(engine.Position{StartTs: 3, CommitTs: 4})
case <-time.After(2 * time.Second):
panic("should always get a sink task")
}

// With the failpoint blackhole/WriteEventsFail enabled, sink manager should restarts
// the table sink at its checkpoint.
failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/dmlsink/blackhole/WriteEventsFail", "1*return")
defer failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/dmlsink/blackhole/WriteEventsFail")
select {
case task := <-manager.sinkTaskChan:
require.Equal(t, engine.Position{StartTs: 2, CommitTs: 2}, task.lowerBound)
task.callback(engine.Position{StartTs: 3, CommitTs: 4})
case <-time.After(2 * time.Second):
panic("should always get a sink task")
}
}
19 changes: 3 additions & 16 deletions cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func newSinkWorker(
}

func (w *sinkWorker) handleTasks(ctx context.Context, taskChan <-chan *sinkTask) error {
failpoint.Inject("SinkWorkerTaskHandlePause", func() { <-ctx.Done() })
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -169,25 +170,11 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
// events have been reported. Then we can continue the table
// at the checkpoint position.
case tablesink.SinkInternalError:
task.tableSink.closeAndClearTableSink()
// After the table sink is cleared all pending events are sent out or dropped.
// So we can re-add the table into sinkMemQuota.
w.sinkMemQuota.ClearTable(task.tableSink.span)

// Restart the table sink based on the checkpoint position.
if err := task.tableSink.restart(ctx); err == nil {
checkpointTs := task.tableSink.getCheckpointTs()
ckpt := checkpointTs.ResolvedMark()
lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt}
performCallback(lastWrittenPos)
log.Info("table sink has been restarted",
zap.String("namespace", w.changefeedID.Namespace),
zap.String("changefeed", w.changefeedID.ID),
zap.Stringer("span", &task.span),
zap.Any("lastWrittenPos", lastWrittenPos),
zap.String("sinkError", finalErr.Error()))
finalErr = err
}
performCallback(advancer.lastPos)
finalErr = nil
default:
}
}
Expand Down
14 changes: 14 additions & 0 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ func (t *tableSinkWrapper) start(ctx context.Context, startTs model.Ts) (err err
break
}
}
if model.NewResolvedTs(startTs).Greater(t.tableSink.checkpointTs) {
t.tableSink.checkpointTs = model.NewResolvedTs(startTs)
t.tableSink.resolvedTs = model.NewResolvedTs(startTs)
t.tableSink.advanced = time.Now()
}
t.state.Store(tablepb.TableStateReplicating)
return nil
}
Expand Down Expand Up @@ -363,6 +368,15 @@ func (t *tableSinkWrapper) doTableSinkClear() {
t.tableSink.version = 0
}

func (t *tableSinkWrapper) checkTableSinkHealth() (err error) {
t.tableSink.RLock()
defer t.tableSink.RUnlock()
if t.tableSink.s != nil {
err = t.tableSink.s.CheckHealth()
}
return
}

// When the attached sink fail, there can be some events that have already been
// committed at downstream but we don't know. So we need to update `replicateTs`
// of the table so that we can re-send those events later.
Expand Down
18 changes: 11 additions & 7 deletions cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package blackhole

import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
Expand All @@ -33,14 +35,16 @@ func NewDMLSink() *DMLSink {
}

// WriteEvents log the events.
func (s *DMLSink) WriteEvents(rows ...*dmlsink.CallbackableEvent[*model.RowChangedEvent]) error {
for _, row := range rows {
// NOTE: don't change the log, some tests depend on it.
log.Debug("BlackHoleSink: WriteEvents", zap.Any("row", row.Event))
row.Callback()
func (s *DMLSink) WriteEvents(rows ...*dmlsink.CallbackableEvent[*model.RowChangedEvent]) (err error) {
failpoint.Inject("WriteEventsFail", func() { err = errors.New("InjectedErrorForWriteEventsFail") })
if err == nil {
for _, row := range rows {
// NOTE: don't change the log, some tests depend on it.
log.Debug("BlackHoleSink: WriteEvents", zap.Any("row", row.Event))
row.Callback()
}
}

return nil
return
}

// Scheme return the scheme of the sink.
Expand Down
2 changes: 2 additions & 0 deletions cdc/sink/tablesink/table_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type TableSink interface {
Close()
// AsyncClose closes the table sink asynchronously. Returns true if it's closed.
AsyncClose() bool
// CheckHealth checks whether the associated sink backend is healthy or not.
CheckHealth() error
}

// SinkInternalError means the error comes from sink internal.
Expand Down
8 changes: 8 additions & 0 deletions cdc/sink/tablesink/table_sink_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,14 @@ func (e *EventTableSink[E, P]) AsyncClose() bool {
return false
}

// CheckHealth checks whether the associated sink backend is healthy or not.
func (e *EventTableSink[E, P]) CheckHealth() error {
if err := e.backendSink.WriteEvents(); err != nil {
return SinkInternalError{err}
}
return nil
}

func (e *EventTableSink[E, P]) freeze() {
// Notice: We have to set the state to stopping first,
// otherwise the progressTracker may be advanced incorrectly.
Expand Down

0 comments on commit f35b76a

Please sign in to comment.