diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index f218ff65a94..847e7d7a507 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -51,8 +51,8 @@ type TablePipeline interface { Status() TableStatus // Cancel stops this table pipeline immediately and destroy all resources created by this table pipeline Cancel() - // Wait waits for all node destroyed and returns errors - Wait() []error + // Wait waits for table pipeline destroyed + Wait() } type tablePipelineImpl struct { @@ -125,9 +125,9 @@ func (t *tablePipelineImpl) Cancel() { t.cancel() } -// Wait waits for all node destroyed and returns errors -func (t *tablePipelineImpl) Wait() []error { - return t.p.Wait() +// Wait waits for table pipeline destroyed +func (t *tablePipelineImpl) Wait() { + t.p.Wait() } // NewTablePipeline creates a table pipeline @@ -145,7 +145,7 @@ func NewTablePipeline(ctx context.Context, tableName string, replicaInfo *model.TableReplicaInfo, sink sink.Sink, - targetTs model.Ts) (context.Context, TablePipeline) { + targetTs model.Ts) TablePipeline { ctx, cancel := context.WithCancel(ctx) tablePipeline := &tablePipelineImpl{ tableID: tableID, @@ -154,7 +154,7 @@ func NewTablePipeline(ctx context.Context, cancel: cancel, } - ctx, p := pipeline.NewPipeline(ctx, 500*time.Millisecond) + p := pipeline.NewPipeline(ctx, 500*time.Millisecond) p.AppendNode(ctx, "puller", newPullerNode(changefeedID, credential, kvStorage, limitter, tableID, replicaInfo, tableName)) p.AppendNode(ctx, "sorter", newSorterNode(sortEngine, sortDir, changefeedID, tableName, tableID)) p.AppendNode(ctx, "mounter", newMounterNode(mounter)) @@ -165,5 +165,5 @@ func NewTablePipeline(ctx context.Context, tablePipeline.sinkNode = newSinkNode(sink, replicaInfo.StartTs, targetTs) p.AppendNode(ctx, "sink", tablePipeline.sinkNode) tablePipeline.p = p - return ctx, tablePipeline + return tablePipeline } diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index a47a939dd9f..645ea7dbebc 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -662,6 +662,15 @@ func (p *processor) createTablePipelineImpl(ctx context.Context, tableID model.T SchemaStorage: p.schemaStorage, Config: p.changefeed.Info.Config, }) + cdcCtx = cdccontext.WithErrorHandler(cdcCtx, func(err error) error { + if cerror.ErrTableProcessorStoppedSafely.Equal(err) || + errors.Cause(errors.Cause(err)) == context.Canceled { + return nil + } + p.sendError(err) + return nil + }) + kvStorage, err := util.KVStorageFromCtx(ctx) if err != nil { return nil, errors.Trace(err) @@ -680,7 +689,7 @@ func (p *processor) createTablePipelineImpl(ctx context.Context, tableID model.T } sink := p.sinkManager.CreateTableSink(tableID, replicaInfo.StartTs) - _, table := tablepipeline.NewTablePipeline( + table := tablepipeline.NewTablePipeline( cdcCtx, p.changefeed.ID, p.credential, @@ -698,12 +707,7 @@ func (p *processor) createTablePipelineImpl(ctx context.Context, tableID model.T p.wg.Add(1) p.metricSyncTableNumGauge.Inc() go func() { - for _, err := range table.Wait() { - if cerror.ErrTableProcessorStoppedSafely.Equal(err) || errors.Cause(err) == context.Canceled { - continue - } - p.sendError(err) - } + table.Wait() p.wg.Done() p.metricSyncTableNumGauge.Dec() log.Debug("Table pipeline exited", zap.Int64("tableID", tableID), diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index 547170d0094..bff4f3eb0ec 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -165,7 +165,7 @@ func (m *mockTablePipeline) Cancel() { m.canceled = true } -func (m *mockTablePipeline) Wait() []error { +func (m *mockTablePipeline) Wait() { panic("not implemented") // TODO: Implement } diff --git a/pkg/context/context.go b/pkg/context/context.go index de0e36ad949..f3b24cdd4bf 100644 --- a/pkg/context/context.go +++ b/pkg/context/context.go @@ -15,10 +15,12 @@ package context import ( "context" + "log" "github.com/pingcap/ticdc/cdc/entry" "github.com/pingcap/ticdc/pkg/config" pd "github.com/tikv/pd/client" + "go.uber.org/zap" ) // Vars contains some vars which can be used anywhere in a pipeline @@ -73,7 +75,13 @@ func (ctx *rootContext) Vars() *Vars { return ctx.vars } -func (ctx *rootContext) Throw(error) { /* do nothing */ } +func (ctx *rootContext) Throw(err error) { + if err == nil { + return + } + // make sure all error has been catched + log.Panic("an error has escaped, please report a bug", zap.Error(err)) +} type stdContext struct { stdCtx context.Context @@ -104,11 +112,13 @@ func WithCancel(ctx Context) (Context, context.CancelFunc) { type throwContext struct { Context - f func(error) + f func(error) error } // WithErrorHandler creates a new context that can watch the Throw function -func WithErrorHandler(ctx Context, f func(error)) Context { +// if the function `f` specified in WithErrorHandler returns an error, +// the error will be thrown to the parent context. +func WithErrorHandler(ctx Context, f func(error) error) Context { return &throwContext{ Context: ctx, f: f, @@ -119,6 +129,7 @@ func (ctx *throwContext) Throw(err error) { if err == nil { return } - ctx.f(err) - ctx.Context.Throw(err) + if err := ctx.f(err); err != nil { + ctx.Context.Throw(err) + } } diff --git a/pkg/context/context_test.go b/pkg/context/context_test.go index 9166b8eccbb..04ca9e14197 100644 --- a/pkg/context/context_test.go +++ b/pkg/context/context_test.go @@ -84,9 +84,10 @@ func (s *contextSuite) TestThrow(c *check.C) { stdCtx := context.Background() ctx := NewContext(stdCtx, &Vars{}) ctx, cancel := WithCancel(ctx) - ctx = WithErrorHandler(ctx, func(err error) { + ctx = WithErrorHandler(ctx, func(err error) error { c.Assert(err.Error(), check.Equals, "mock error") cancel() + return nil }) ctx.Throw(nil) ctx.Throw(errors.New("mock error")) @@ -98,8 +99,8 @@ func (s *contextSuite) TestThrowCascade(c *check.C) { stdCtx := context.Background() ctx := NewContext(stdCtx, &Vars{}) ctx, cancel := WithCancel(ctx) - var errNum1, errNum2 int - ctx = WithErrorHandler(ctx, func(err error) { + var errNum1, errNum2, errNum3 int + ctx = WithErrorHandler(ctx, func(err error) error { if err.Error() == "mock error" { errNum1++ } else if err.Error() == "mock error2" { @@ -107,13 +108,37 @@ func (s *contextSuite) TestThrowCascade(c *check.C) { } else { c.Fail() } + return nil }) - ctx2 := WithErrorHandler(ctx, func(err error) { - errNum2++ - c.Assert(err.Error(), check.Equals, "mock error2") + ctx2 := WithErrorHandler(ctx, func(err error) error { + if err.Error() == "mock error2" { + errNum2++ + return err + } else if err.Error() == "mock error3" { + errNum3++ + } else { + c.Fail() + } + return nil }) ctx2.Throw(errors.New("mock error2")) + ctx2.Throw(errors.New("mock error3")) ctx.Throw(errors.New("mock error")) + c.Assert(errNum1, check.Equals, 1) + c.Assert(errNum2, check.Equals, 2) + c.Assert(errNum3, check.Equals, 1) cancel() <-ctx.Done() } + +func (s *contextSuite) TestThrowPanic(c *check.C) { + defer testleak.AfterTest(c)() + defer func() { + panicMsg := recover() + c.Assert(panicMsg, check.Equals, "an error has escaped, please report a bug{error 26 0 mock error}") + }() + stdCtx := context.Background() + ctx := NewContext(stdCtx, &Vars{}) + ctx.Throw(nil) + ctx.Throw(errors.New("mock error")) +} diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index 13eaa3ec13f..84bc07d71bf 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -34,14 +34,12 @@ type Pipeline struct { header headRunner runners []runner runnersWg sync.WaitGroup - errors []error - errorsMu sync.Mutex closeMu sync.Mutex isClosed bool } // NewPipeline creates a new pipeline -func NewPipeline(ctx context.Context, tickDuration time.Duration) (context.Context, *Pipeline) { +func NewPipeline(ctx context.Context, tickDuration time.Duration) *Pipeline { header := make(headRunner, 4) runners := make([]runner, 0, 16) runners = append(runners, header) @@ -49,33 +47,34 @@ func NewPipeline(ctx context.Context, tickDuration time.Duration) (context.Conte header: header, runners: runners, } - ctx = context.WithErrorHandler(ctx, func(err error) { - p.addError(err) - p.close() - }) go func() { + var tickCh <-chan time.Time if tickDuration > 0 { ticker := time.NewTicker(tickDuration) defer ticker.Stop() - for { - select { - case <-ticker.C: - p.SendToFirstNode(TickMessage()) //nolint:errcheck - case <-ctx.Done(): - p.close() - return - } - } + tickCh = ticker.C } else { - <-ctx.Done() - p.close() + tickCh = make(chan time.Time) + } + for { + select { + case <-tickCh: + p.SendToFirstNode(TickMessage()) //nolint:errcheck + case <-ctx.Done(): + p.close() + return + } } }() - return ctx, p + return p } // AppendNode appends the node to the pipeline func (p *Pipeline) AppendNode(ctx context.Context, name string, node Node) { + ctx = context.WithErrorHandler(ctx, func(err error) error { + p.close() + return err + }) lastRunner := p.runners[len(p.runners)-1] runner := newNodeRunner(name, node, lastRunner) p.runners = append(p.runners, runner) @@ -92,7 +91,7 @@ func (p *Pipeline) driveRunner(ctx context.Context, previousRunner, runner runne }() err := runner.run(ctx) if err != nil { - p.addError(err) + ctx.Throw(err) log.Error("found error when running the node", zap.String("name", runner.getName()), zap.Error(err)) } } @@ -118,16 +117,7 @@ func (p *Pipeline) close() { } } -func (p *Pipeline) addError(err error) { - p.errorsMu.Lock() - defer p.errorsMu.Unlock() - p.errors = append(p.errors, err) -} - -// Wait all the nodes exited and return the errors found from nodes -func (p *Pipeline) Wait() []error { +// Wait all the nodes exited +func (p *Pipeline) Wait() { p.runnersWg.Wait() - p.errorsMu.Lock() - defer p.errorsMu.Unlock() - return p.errors } diff --git a/pkg/pipeline/pipeline_test.go b/pkg/pipeline/pipeline_test.go index 8c05b6e09e5..a4ec9e638bc 100644 --- a/pkg/pipeline/pipeline_test.go +++ b/pkg/pipeline/pipeline_test.go @@ -103,7 +103,11 @@ func (s *pipelineSuite) TestPipelineUsage(c *check.C) { defer testleak.AfterTest(c)() ctx := context.NewContext(stdCtx.Background(), &context.Vars{}) ctx, cancel := context.WithCancel(ctx) - ctx, p := NewPipeline(ctx, -1) + ctx = context.WithErrorHandler(ctx, func(err error) error { + c.Fatal(err) + return err + }) + p := NewPipeline(ctx, -1) p.AppendNode(ctx, "echo node", echoNode{}) p.AppendNode(ctx, "check node", &checkNode{ c: c, @@ -176,8 +180,7 @@ func (s *pipelineSuite) TestPipelineUsage(c *check.C) { })) c.Assert(err, check.IsNil) cancel() - errs := p.Wait() - c.Assert(len(errs), check.Equals, 0) + p.Wait() } type errorNode struct { @@ -211,7 +214,11 @@ func (s *pipelineSuite) TestPipelineError(c *check.C) { ctx := context.NewContext(stdCtx.Background(), &context.Vars{}) ctx, cancel := context.WithCancel(ctx) defer cancel() - ctx, p := NewPipeline(ctx, -1) + ctx = context.WithErrorHandler(ctx, func(err error) error { + c.Assert(err.Error(), check.Equals, "error node throw an error, index: 3") + return nil + }) + p := NewPipeline(ctx, -1) p.AppendNode(ctx, "echo node", echoNode{}) p.AppendNode(ctx, "error node", &errorNode{c: c}) p.AppendNode(ctx, "check node", &checkNode{ @@ -254,9 +261,7 @@ func (s *pipelineSuite) TestPipelineError(c *check.C) { }, }, })) - errs := p.Wait() - c.Assert(len(errs), check.Equals, 1) - c.Assert(errs[0].Error(), check.Equals, "error node throw an error, index: 3") + p.Wait() } type throwNode struct { @@ -291,7 +296,12 @@ func (s *pipelineSuite) TestPipelineThrow(c *check.C) { ctx := context.NewContext(stdCtx.Background(), &context.Vars{}) ctx, cancel := context.WithCancel(ctx) defer cancel() - ctx, p := NewPipeline(ctx, -1) + var errs []error + ctx = context.WithErrorHandler(ctx, func(err error) error { + errs = append(errs, err) + return nil + }) + p := NewPipeline(ctx, -1) p.AppendNode(ctx, "echo node", echoNode{}) p.AppendNode(ctx, "error node", &throwNode{c: c}) err := p.SendToFirstNode(PolymorphicEventMessage(&model.PolymorphicEvent{ @@ -318,13 +328,13 @@ func (s *pipelineSuite) TestPipelineThrow(c *check.C) { if err != nil { // pipeline closed before the second message was sent c.Assert(cerror.ErrSendToClosedPipeline.Equal(err), check.IsTrue) - errs := p.Wait() + p.Wait() c.Assert(len(errs), check.Equals, 2) c.Assert(errs[0].Error(), check.Equals, "error node throw an error, index: 3") c.Assert(errs[1].Error(), check.Equals, "error node throw an error, index: 4") } else { // the second message was sent before pipeline closed - errs := p.Wait() + p.Wait() c.Assert(len(errs), check.Equals, 4) c.Assert(errs[0].Error(), check.Equals, "error node throw an error, index: 3") c.Assert(errs[1].Error(), check.Equals, "error node throw an error, index: 4") @@ -337,7 +347,11 @@ func (s *pipelineSuite) TestPipelineAppendNode(c *check.C) { defer testleak.AfterTest(c)() ctx := context.NewContext(stdCtx.Background(), &context.Vars{}) ctx, cancel := context.WithCancel(ctx) - ctx, p := NewPipeline(ctx, -1) + ctx = context.WithErrorHandler(ctx, func(err error) error { + c.Fatal(err) + return err + }) + p := NewPipeline(ctx, -1) err := p.SendToFirstNode(PolymorphicEventMessage(&model.PolymorphicEvent{ Row: &model.RowChangedEvent{ Table: &model.TableName{ @@ -413,8 +427,7 @@ func (s *pipelineSuite) TestPipelineAppendNode(c *check.C) { }) cancel() - errs := p.Wait() - c.Assert(len(errs), check.Equals, 0) + p.Wait() } type panicNode struct { @@ -446,7 +459,14 @@ func (s *pipelineSuite) TestPipelinePanic(c *check.C) { ctx := context.NewContext(stdCtx.Background(), &context.Vars{}) ctx, cancel := context.WithCancel(ctx) defer cancel() - ctx, p := NewPipeline(ctx, -1) + ctx = context.WithErrorHandler(ctx, func(err error) error { + c.Fatal(err) + return err + }) + ctx = context.WithErrorHandler(ctx, func(err error) error { + return nil + }) + p := NewPipeline(ctx, -1) p.AppendNode(ctx, "panic", panicNode{}) - c.Assert(p.Wait(), check.HasLen, 0) + p.Wait() }