Skip to content

Commit

Permalink
pipeline: use Message value to reduce GC pressure (#2415) (#2441)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Aug 2, 2021
1 parent c60f0f7 commit 2c66164
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 51 deletions.
4 changes: 2 additions & 2 deletions cdc/processor/pipeline/cyclic_mark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ func (s *markSuite) TestCyclicMarkNode(c *check.C) {
},
})
n := newCyclicMarkNode(markTableID)
err := n.Init(pipeline.MockNodeContext4Test(ctx, nil, nil))
err := n.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil))
c.Assert(err, check.IsNil)
outputCh := make(chan *pipeline.Message)
outputCh := make(chan pipeline.Message)
var wg sync.WaitGroup
wg.Add(2)
go func() {
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (n *mounterNode) Init(ctx pipeline.NodeContext) error {
}

for _, msg := range msgs {
msg := msg.(*pipeline.Message)
msg := msg.(pipeline.Message)
if msg.Tp != pipeline.MessageTypePolymorphicEvent {
// sends the control message directly to the next node
ctx.SendToNextNode(msg)
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/pipeline/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (s *mounterNodeSuite) TestMounterNodeBasics(c *check.C) {
p.AppendNode(ctx, "check", checkNode)

var sentCount int64
sendMsg := func(p *pipeline.Pipeline, msg *pipeline.Message) {
sendMsg := func(p *pipeline.Pipeline, msg pipeline.Message) {
err := retry.Do(context.Background(), func() error {
return p.SendToFirstNode(msg)
}, retry.WithBackoffBaseDelay(10), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(100))
Expand All @@ -123,7 +123,7 @@ func (s *mounterNodeSuite) TestMounterNodeBasics(c *check.C) {
go func() {
defer wg.Done()
for i := 0; i < basicsTestMessageCount; i++ {
var msg *pipeline.Message
var msg pipeline.Message
if i%100 == 0 {
// generates a control message
msg = pipeline.TickMessage()
Expand Down
12 changes: 6 additions & 6 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *outputSuite) TestStatus(c *check.C) {

// test stop at targetTs
node := newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, pipeline.BarrierMessage(20), nil)), check.IsNil)
Expand All @@ -145,7 +145,7 @@ func (s *outputSuite) TestStatus(c *check.C) {

// test the stop at ts command
node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, pipeline.BarrierMessage(20), nil)), check.IsNil)
Expand All @@ -167,7 +167,7 @@ func (s *outputSuite) TestStatus(c *check.C) {

// test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts
node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, pipeline.BarrierMessage(20), nil)), check.IsNil)
Expand Down Expand Up @@ -200,7 +200,7 @@ func (s *outputSuite) TestManyTs(c *check.C) {
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx,
Expand Down Expand Up @@ -257,7 +257,7 @@ func (s *outputSuite) TestSplitUpdateEventWhenEnableOldValue(c *check.C) {
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)

// nil row.
c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx,
Expand Down Expand Up @@ -316,7 +316,7 @@ func (s *outputSuite) TestSplitUpdateEventWhenDisableOldValue(c *check.C) {
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)

// nil row.
c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx,
Expand Down
20 changes: 10 additions & 10 deletions pkg/pipeline/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,46 +20,46 @@ type NodeContext interface {
context.Context

// Message returns the message sent by the previous node
Message() *Message
Message() Message
// SendToNextNode sends the message to the next node
SendToNextNode(msg *Message)
SendToNextNode(msg Message)
}

type nodeContext struct {
context.Context
msg *Message
outputCh chan *Message
msg Message
outputCh chan Message
}

func newNodeContext(ctx context.Context, msg *Message, outputCh chan *Message) NodeContext {
func newNodeContext(ctx context.Context, msg Message, outputCh chan Message) NodeContext {
return &nodeContext{
Context: ctx,
msg: msg,
outputCh: outputCh,
}
}

func (ctx *nodeContext) Message() *Message {
func (ctx *nodeContext) Message() Message {
return ctx.msg
}

func (ctx *nodeContext) SendToNextNode(msg *Message) {
func (ctx *nodeContext) SendToNextNode(msg Message) {
// The header channel should never be blocked
ctx.outputCh <- msg
}

type messageContext struct {
NodeContext
message *Message
message Message
}

func withMessage(ctx NodeContext, msg *Message) NodeContext {
func withMessage(ctx NodeContext, msg Message) NodeContext {
return messageContext{
NodeContext: ctx,
message: msg,
}
}

func (ctx messageContext) Message() *Message {
func (ctx messageContext) Message() Message {
return ctx.message
}
24 changes: 10 additions & 14 deletions pkg/pipeline/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,39 +41,35 @@ type Message struct {
}

// PolymorphicEventMessage creates the message of PolymorphicEvent
func PolymorphicEventMessage(event *model.PolymorphicEvent) *Message {
return &Message{
func PolymorphicEventMessage(event *model.PolymorphicEvent) Message {
return Message{
Tp: MessageTypePolymorphicEvent,
PolymorphicEvent: event,
}
}

// CommandMessage creates the message of Command
func CommandMessage(command *Command) *Message {
return &Message{
func CommandMessage(command *Command) Message {
return Message{
Tp: MessageTypeCommand,
Command: command,
}
}

// BarrierMessage creates the message of Command
func BarrierMessage(barrierTs model.Ts) *Message {
return &Message{
func BarrierMessage(barrierTs model.Ts) Message {
return Message{
Tp: MessageTypeBarrier,
BarrierTs: barrierTs,
}
}

// TickMessage is called frequently,
// to ease GC pressure we return a global variable.
var tickMsg *Message = &Message{
Tp: MessageTypeTick,
}

// TickMessage creates the message of Tick
// Note: the returned message is READ-ONLY.
func TickMessage() *Message {
return tickMsg
func TickMessage() Message {
return Message{
Tp: MessageTypeTick,
}
}

// CommandType is the type of Command
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (p *Pipeline) driveRunner(ctx context.Context, previousRunner, runner runne
var pipelineTryAgainError error = cerror.ErrPipelineTryAgain.FastGenByArgs()

// SendToFirstNode sends the message to the first node
func (p *Pipeline) SendToFirstNode(msg *Message) error {
func (p *Pipeline) SendToFirstNode(msg Message) error {
p.closeMu.Lock()
defer p.closeMu.Unlock()
if p.isClosed {
Expand Down
73 changes: 69 additions & 4 deletions pkg/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package pipeline

import (
stdCtx "context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -76,7 +77,7 @@ func (e echoNode) Destroy(ctx NodeContext) error {

type checkNode struct {
c *check.C
expected []*Message
expected []Message
index int
}

Expand Down Expand Up @@ -112,7 +113,7 @@ func (s *pipelineSuite) TestPipelineUsage(c *check.C) {
p.AppendNode(ctx, "echo node", echoNode{})
p.AppendNode(ctx, "check node", &checkNode{
c: c,
expected: []*Message{
expected: []Message{
PolymorphicEventMessage(&model.PolymorphicEvent{
Row: &model.RowChangedEvent{
Table: &model.TableName{
Expand Down Expand Up @@ -225,7 +226,7 @@ func (s *pipelineSuite) TestPipelineError(c *check.C) {
p.AppendNode(ctx, "error node", &errorNode{c: c})
p.AppendNode(ctx, "check node", &checkNode{
c: c,
expected: []*Message{
expected: []Message{
PolymorphicEventMessage(&model.PolymorphicEvent{
Row: &model.RowChangedEvent{
Table: &model.TableName{
Expand Down Expand Up @@ -380,7 +381,7 @@ func (s *pipelineSuite) TestPipelineAppendNode(c *check.C) {

p.AppendNode(ctx, "check node", &checkNode{
c: c,
expected: []*Message{
expected: []Message{
PolymorphicEventMessage(&model.PolymorphicEvent{
Row: &model.RowChangedEvent{
Table: &model.TableName{
Expand Down Expand Up @@ -475,3 +476,67 @@ func (s *pipelineSuite) TestPipelinePanic(c *check.C) {
p.AppendNode(ctx, "panic", panicNode{})
p.Wait()
}

type forward struct {
ch chan Message
}

func (n *forward) Init(ctx NodeContext) error {
return nil
}

func (n *forward) Receive(ctx NodeContext) error {
m := ctx.Message()
if n.ch != nil {
n.ch <- m
} else {
ctx.SendToNextNode(m)
}
return nil
}

func (n *forward) Destroy(ctx NodeContext) error {
return nil
}

// Run the benchmark
// go test -benchmem -run=^$ -bench ^(BenchmarkPipeline)$ github.com/pingcap/ticdc/pkg/pipeline
func BenchmarkPipeline(b *testing.B) {
ctx := context.NewContext(stdCtx.Background(), &context.GlobalVars{})
runnersSize, outputChannelSize := 2, 64

b.Run("BenchmarkPipeline", func(b *testing.B) {
for i := 1; i <= 8; i++ {
ctx, cancel := context.WithCancel(ctx)
ctx = context.WithErrorHandler(ctx, func(err error) error {
b.Fatal(err)
return err
})

ch := make(chan Message)
p := NewPipeline(ctx, -1, runnersSize, outputChannelSize)
for j := 0; j < i; j++ {
if (j + 1) == i {
// The last node
p.AppendNode(ctx, "forward node", &forward{ch: ch})
} else {
p.AppendNode(ctx, "forward node", &forward{})
}
}

b.ResetTimer()
b.Run(fmt.Sprintf("%d node(s)", i), func(b *testing.B) {
for i := 0; i < b.N; i++ {
err := p.SendToFirstNode(BarrierMessage(1))
if err != nil {
b.Fatal(err)
}
<-ch
}
})
b.StopTimer()
cancel()
p.Wait()
}
})
}
14 changes: 7 additions & 7 deletions pkg/pipeline/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

type runner interface {
run(ctx context.Context) error
getOutputCh() chan *Message
getOutputCh() chan Message
getNode() Node
getName() string
}
Expand All @@ -30,20 +30,20 @@ type nodeRunner struct {
name string
node Node
previous runner
outputCh chan *Message
outputCh chan Message
}

func newNodeRunner(name string, node Node, previous runner, outputChanSize int) *nodeRunner {
return &nodeRunner{
name: name,
node: node,
previous: previous,
outputCh: make(chan *Message, outputChanSize),
outputCh: make(chan Message, outputChanSize),
}
}

func (r *nodeRunner) run(ctx context.Context) error {
nodeCtx := newNodeContext(ctx, nil, r.outputCh)
nodeCtx := newNodeContext(ctx, Message{}, r.outputCh)
defer close(r.outputCh)
defer func() {
err := r.node.Destroy(nodeCtx)
Expand All @@ -65,7 +65,7 @@ func (r *nodeRunner) run(ctx context.Context) error {
return nil
}

func (r *nodeRunner) getOutputCh() chan *Message {
func (r *nodeRunner) getOutputCh() chan Message {
return r.outputCh
}

Expand All @@ -77,7 +77,7 @@ func (r *nodeRunner) getName() string {
return r.name
}

type headRunner chan *Message
type headRunner chan Message

func (h headRunner) getName() string {
return "header"
Expand All @@ -87,7 +87,7 @@ func (h headRunner) run(ctx context.Context) error {
panic("unreachable")
}

func (h headRunner) getOutputCh() chan *Message {
func (h headRunner) getOutputCh() chan Message {
return h
}

Expand Down
Loading

0 comments on commit 2c66164

Please sign in to comment.