From 2c66164954009a1a0cb489d145b0c242b8c891b2 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 2 Aug 2021 23:23:06 +0800 Subject: [PATCH] pipeline: use Message value to reduce GC pressure (#2415) (#2441) --- cdc/processor/pipeline/cyclic_mark_test.go | 4 +- cdc/processor/pipeline/mounter.go | 2 +- cdc/processor/pipeline/mounter_test.go | 4 +- cdc/processor/pipeline/sink_test.go | 12 ++-- pkg/pipeline/context.go | 20 +++--- pkg/pipeline/message.go | 24 +++---- pkg/pipeline/pipeline.go | 2 +- pkg/pipeline/pipeline_test.go | 73 ++++++++++++++++++++-- pkg/pipeline/runner.go | 14 ++--- pkg/pipeline/test.go | 8 +-- 10 files changed, 112 insertions(+), 51 deletions(-) diff --git a/cdc/processor/pipeline/cyclic_mark_test.go b/cdc/processor/pipeline/cyclic_mark_test.go index 95e775960c2..350b4be2141 100644 --- a/cdc/processor/pipeline/cyclic_mark_test.go +++ b/cdc/processor/pipeline/cyclic_mark_test.go @@ -144,9 +144,9 @@ func (s *markSuite) TestCyclicMarkNode(c *check.C) { }, }) n := newCyclicMarkNode(markTableID) - err := n.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)) + err := n.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)) c.Assert(err, check.IsNil) - outputCh := make(chan *pipeline.Message) + outputCh := make(chan pipeline.Message) var wg sync.WaitGroup wg.Add(2) go func() { diff --git a/cdc/processor/pipeline/mounter.go b/cdc/processor/pipeline/mounter.go index a38b313b96e..1bfc94b43c4 100644 --- a/cdc/processor/pipeline/mounter.go +++ b/cdc/processor/pipeline/mounter.go @@ -82,7 +82,7 @@ func (n *mounterNode) Init(ctx pipeline.NodeContext) error { } for _, msg := range msgs { - msg := msg.(*pipeline.Message) + msg := msg.(pipeline.Message) if msg.Tp != pipeline.MessageTypePolymorphicEvent { // sends the control message directly to the next node ctx.SendToNextNode(msg) diff --git a/cdc/processor/pipeline/mounter_test.go b/cdc/processor/pipeline/mounter_test.go index 3ff2894735b..dbb19ccee56 100644 --- a/cdc/processor/pipeline/mounter_test.go +++ b/cdc/processor/pipeline/mounter_test.go @@ -109,7 +109,7 @@ func (s *mounterNodeSuite) TestMounterNodeBasics(c *check.C) { p.AppendNode(ctx, "check", checkNode) var sentCount int64 - sendMsg := func(p *pipeline.Pipeline, msg *pipeline.Message) { + sendMsg := func(p *pipeline.Pipeline, msg pipeline.Message) { err := retry.Do(context.Background(), func() error { return p.SendToFirstNode(msg) }, retry.WithBackoffBaseDelay(10), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(100)) @@ -123,7 +123,7 @@ func (s *mounterNodeSuite) TestMounterNodeBasics(c *check.C) { go func() { defer wg.Done() for i := 0; i < basicsTestMessageCount; i++ { - var msg *pipeline.Message + var msg pipeline.Message if i%100 == 0 { // generates a control message msg = pipeline.TickMessage() diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 34fe1fbd8ba..9aeb0f6a792 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -119,7 +119,7 @@ func (s *outputSuite) TestStatus(c *check.C) { // test stop at targetTs node := newSinkNode(&mockSink{}, 0, 10, &mockFlowController{}) - c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil) + c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, pipeline.BarrierMessage(20), nil)), check.IsNil) @@ -145,7 +145,7 @@ func (s *outputSuite) TestStatus(c *check.C) { // test the stop at ts command node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{}) - c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil) + c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, pipeline.BarrierMessage(20), nil)), check.IsNil) @@ -167,7 +167,7 @@ func (s *outputSuite) TestStatus(c *check.C) { // test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{}) - c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil) + c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, pipeline.BarrierMessage(20), nil)), check.IsNil) @@ -200,7 +200,7 @@ func (s *outputSuite) TestManyTs(c *check.C) { }) sink := &mockSink{} node := newSinkNode(sink, 0, 10, &mockFlowController{}) - c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil) + c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, @@ -257,7 +257,7 @@ func (s *outputSuite) TestSplitUpdateEventWhenEnableOldValue(c *check.C) { }) sink := &mockSink{} node := newSinkNode(sink, 0, 10, &mockFlowController{}) - c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil) + c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) // nil row. c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, @@ -316,7 +316,7 @@ func (s *outputSuite) TestSplitUpdateEventWhenDisableOldValue(c *check.C) { }) sink := &mockSink{} node := newSinkNode(sink, 0, 10, &mockFlowController{}) - c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil) + c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) // nil row. c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, diff --git a/pkg/pipeline/context.go b/pkg/pipeline/context.go index 0e4b2ffc48b..f3de42a1e74 100644 --- a/pkg/pipeline/context.go +++ b/pkg/pipeline/context.go @@ -20,18 +20,18 @@ type NodeContext interface { context.Context // Message returns the message sent by the previous node - Message() *Message + Message() Message // SendToNextNode sends the message to the next node - SendToNextNode(msg *Message) + SendToNextNode(msg Message) } type nodeContext struct { context.Context - msg *Message - outputCh chan *Message + msg Message + outputCh chan Message } -func newNodeContext(ctx context.Context, msg *Message, outputCh chan *Message) NodeContext { +func newNodeContext(ctx context.Context, msg Message, outputCh chan Message) NodeContext { return &nodeContext{ Context: ctx, msg: msg, @@ -39,27 +39,27 @@ func newNodeContext(ctx context.Context, msg *Message, outputCh chan *Message) N } } -func (ctx *nodeContext) Message() *Message { +func (ctx *nodeContext) Message() Message { return ctx.msg } -func (ctx *nodeContext) SendToNextNode(msg *Message) { +func (ctx *nodeContext) SendToNextNode(msg Message) { // The header channel should never be blocked ctx.outputCh <- msg } type messageContext struct { NodeContext - message *Message + message Message } -func withMessage(ctx NodeContext, msg *Message) NodeContext { +func withMessage(ctx NodeContext, msg Message) NodeContext { return messageContext{ NodeContext: ctx, message: msg, } } -func (ctx messageContext) Message() *Message { +func (ctx messageContext) Message() Message { return ctx.message } diff --git a/pkg/pipeline/message.go b/pkg/pipeline/message.go index 62b8458bc8e..303bf2cf75c 100644 --- a/pkg/pipeline/message.go +++ b/pkg/pipeline/message.go @@ -41,39 +41,35 @@ type Message struct { } // PolymorphicEventMessage creates the message of PolymorphicEvent -func PolymorphicEventMessage(event *model.PolymorphicEvent) *Message { - return &Message{ +func PolymorphicEventMessage(event *model.PolymorphicEvent) Message { + return Message{ Tp: MessageTypePolymorphicEvent, PolymorphicEvent: event, } } // CommandMessage creates the message of Command -func CommandMessage(command *Command) *Message { - return &Message{ +func CommandMessage(command *Command) Message { + return Message{ Tp: MessageTypeCommand, Command: command, } } // BarrierMessage creates the message of Command -func BarrierMessage(barrierTs model.Ts) *Message { - return &Message{ +func BarrierMessage(barrierTs model.Ts) Message { + return Message{ Tp: MessageTypeBarrier, BarrierTs: barrierTs, } } -// TickMessage is called frequently, -// to ease GC pressure we return a global variable. -var tickMsg *Message = &Message{ - Tp: MessageTypeTick, -} - // TickMessage creates the message of Tick // Note: the returned message is READ-ONLY. -func TickMessage() *Message { - return tickMsg +func TickMessage() Message { + return Message{ + Tp: MessageTypeTick, + } } // CommandType is the type of Command diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index 08a725d03a5..12ab3fa9517 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -103,7 +103,7 @@ func (p *Pipeline) driveRunner(ctx context.Context, previousRunner, runner runne var pipelineTryAgainError error = cerror.ErrPipelineTryAgain.FastGenByArgs() // SendToFirstNode sends the message to the first node -func (p *Pipeline) SendToFirstNode(msg *Message) error { +func (p *Pipeline) SendToFirstNode(msg Message) error { p.closeMu.Lock() defer p.closeMu.Unlock() if p.isClosed { diff --git a/pkg/pipeline/pipeline_test.go b/pkg/pipeline/pipeline_test.go index 7a30e868c87..1b1fb3e041e 100644 --- a/pkg/pipeline/pipeline_test.go +++ b/pkg/pipeline/pipeline_test.go @@ -15,6 +15,7 @@ package pipeline import ( stdCtx "context" + "fmt" "testing" "time" @@ -76,7 +77,7 @@ func (e echoNode) Destroy(ctx NodeContext) error { type checkNode struct { c *check.C - expected []*Message + expected []Message index int } @@ -112,7 +113,7 @@ func (s *pipelineSuite) TestPipelineUsage(c *check.C) { p.AppendNode(ctx, "echo node", echoNode{}) p.AppendNode(ctx, "check node", &checkNode{ c: c, - expected: []*Message{ + expected: []Message{ PolymorphicEventMessage(&model.PolymorphicEvent{ Row: &model.RowChangedEvent{ Table: &model.TableName{ @@ -225,7 +226,7 @@ func (s *pipelineSuite) TestPipelineError(c *check.C) { p.AppendNode(ctx, "error node", &errorNode{c: c}) p.AppendNode(ctx, "check node", &checkNode{ c: c, - expected: []*Message{ + expected: []Message{ PolymorphicEventMessage(&model.PolymorphicEvent{ Row: &model.RowChangedEvent{ Table: &model.TableName{ @@ -380,7 +381,7 @@ func (s *pipelineSuite) TestPipelineAppendNode(c *check.C) { p.AppendNode(ctx, "check node", &checkNode{ c: c, - expected: []*Message{ + expected: []Message{ PolymorphicEventMessage(&model.PolymorphicEvent{ Row: &model.RowChangedEvent{ Table: &model.TableName{ @@ -475,3 +476,67 @@ func (s *pipelineSuite) TestPipelinePanic(c *check.C) { p.AppendNode(ctx, "panic", panicNode{}) p.Wait() } + +type forward struct { + ch chan Message +} + +func (n *forward) Init(ctx NodeContext) error { + return nil +} + +func (n *forward) Receive(ctx NodeContext) error { + m := ctx.Message() + if n.ch != nil { + n.ch <- m + } else { + ctx.SendToNextNode(m) + } + return nil +} + +func (n *forward) Destroy(ctx NodeContext) error { + return nil +} + +// Run the benchmark +// go test -benchmem -run=^$ -bench ^(BenchmarkPipeline)$ github.com/pingcap/ticdc/pkg/pipeline +func BenchmarkPipeline(b *testing.B) { + ctx := context.NewContext(stdCtx.Background(), &context.GlobalVars{}) + runnersSize, outputChannelSize := 2, 64 + + b.Run("BenchmarkPipeline", func(b *testing.B) { + for i := 1; i <= 8; i++ { + ctx, cancel := context.WithCancel(ctx) + ctx = context.WithErrorHandler(ctx, func(err error) error { + b.Fatal(err) + return err + }) + + ch := make(chan Message) + p := NewPipeline(ctx, -1, runnersSize, outputChannelSize) + for j := 0; j < i; j++ { + if (j + 1) == i { + // The last node + p.AppendNode(ctx, "forward node", &forward{ch: ch}) + } else { + p.AppendNode(ctx, "forward node", &forward{}) + } + } + + b.ResetTimer() + b.Run(fmt.Sprintf("%d node(s)", i), func(b *testing.B) { + for i := 0; i < b.N; i++ { + err := p.SendToFirstNode(BarrierMessage(1)) + if err != nil { + b.Fatal(err) + } + <-ch + } + }) + b.StopTimer() + cancel() + p.Wait() + } + }) +} diff --git a/pkg/pipeline/runner.go b/pkg/pipeline/runner.go index 5969fb024e1..30cabf01a75 100644 --- a/pkg/pipeline/runner.go +++ b/pkg/pipeline/runner.go @@ -21,7 +21,7 @@ import ( type runner interface { run(ctx context.Context) error - getOutputCh() chan *Message + getOutputCh() chan Message getNode() Node getName() string } @@ -30,7 +30,7 @@ type nodeRunner struct { name string node Node previous runner - outputCh chan *Message + outputCh chan Message } func newNodeRunner(name string, node Node, previous runner, outputChanSize int) *nodeRunner { @@ -38,12 +38,12 @@ func newNodeRunner(name string, node Node, previous runner, outputChanSize int) name: name, node: node, previous: previous, - outputCh: make(chan *Message, outputChanSize), + outputCh: make(chan Message, outputChanSize), } } func (r *nodeRunner) run(ctx context.Context) error { - nodeCtx := newNodeContext(ctx, nil, r.outputCh) + nodeCtx := newNodeContext(ctx, Message{}, r.outputCh) defer close(r.outputCh) defer func() { err := r.node.Destroy(nodeCtx) @@ -65,7 +65,7 @@ func (r *nodeRunner) run(ctx context.Context) error { return nil } -func (r *nodeRunner) getOutputCh() chan *Message { +func (r *nodeRunner) getOutputCh() chan Message { return r.outputCh } @@ -77,7 +77,7 @@ func (r *nodeRunner) getName() string { return r.name } -type headRunner chan *Message +type headRunner chan Message func (h headRunner) getName() string { return "header" @@ -87,7 +87,7 @@ func (h headRunner) run(ctx context.Context) error { panic("unreachable") } -func (h headRunner) getOutputCh() chan *Message { +func (h headRunner) getOutputCh() chan Message { return h } diff --git a/pkg/pipeline/test.go b/pkg/pipeline/test.go index 9c4ef6e61c8..60d0ab73d02 100644 --- a/pkg/pipeline/test.go +++ b/pkg/pipeline/test.go @@ -19,18 +19,18 @@ import "github.com/pingcap/ticdc/pkg/context" // This function is only for testing. // Only `Node.Receive` will be called, other functions in `Node` will never be called. // When the `Receive` function of the `Node` returns an error, this function will return the message that caused the error and the error -func SendMessageToNode4Test(ctx context.Context, node Node, msgs []*Message, outputCh chan *Message) (*Message, error) { - nodeCtx := newNodeContext(ctx, nil, outputCh) +func SendMessageToNode4Test(ctx context.Context, node Node, msgs []Message, outputCh chan Message) (Message, error) { + nodeCtx := newNodeContext(ctx, Message{}, outputCh) for _, msg := range msgs { err := node.Receive(withMessage(nodeCtx, msg)) if err != nil { return msg, err } } - return nil, nil + return Message{}, nil } // MockNodeContext4Test creates a node context with a message and a output channel for tests. -func MockNodeContext4Test(ctx context.Context, msg *Message, outputCh chan *Message) NodeContext { +func MockNodeContext4Test(ctx context.Context, msg Message, outputCh chan Message) NodeContext { return newNodeContext(ctx, msg, outputCh) }