Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipeline: set table status to stopped after sink is closed #2716

Merged
merged 6 commits into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pingcap/ticdc/cdc/sink"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/pipeline"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -104,7 +103,8 @@ func (n *sinkNode) Init(ctx pipeline.NodeContext) error {
// In this method, the builtin table sink will be closed by calling `Close`, and
// no more events can be sent to this sink node afterwards.
func (n *sinkNode) stop(ctx pipeline.NodeContext) (err error) {
n.status.Store(TableStatusStopped)
// table stopped status must be set after underlying sink is closed
defer n.status.Store(TableStatusStopped)
err = n.sink.Close(ctx)
if err != nil {
return
Expand Down Expand Up @@ -319,12 +319,7 @@ func (n *sinkNode) Receive(ctx pipeline.NodeContext) error {
return errors.Trace(err)
}
case pipeline.MessageTypeCommand:
if msg.Command.Tp == pipeline.CommandTypeStopAtTs {
if msg.Command.StoppedTs < n.checkpointTs {
log.Warn("the stopped ts is less than the checkpoint ts, "+
"the table pipeline can't be stopped accurately, will be stopped soon",
zap.Uint64("stoppedTs", msg.Command.StoppedTs), zap.Uint64("checkpointTs", n.checkpointTs))
}
if msg.Command.Tp == pipeline.CommandTypeStop {
return n.stop(ctx)
}
case pipeline.MessageTypeBarrier:
Expand Down
57 changes: 55 additions & 2 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package pipeline

import (
"context"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -106,6 +107,20 @@ func (s *mockSink) Reset() {
s.received = s.received[:0]
}

type mockCloseControlSink struct {
mockSink
closeCh chan interface{}
}

func (s *mockCloseControlSink) Close(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-s.closeCh:
return nil
}
}

type outputSuite struct{}

var _ = check.Suite(&outputSuite{})
Expand Down Expand Up @@ -160,7 +175,7 @@ func (s *outputSuite) TestStatus(c *check.C) {
c.Assert(node.Status(), check.Equals, TableStatusRunning)

err = node.Receive(pipeline.MockNodeContext4Test(ctx,
pipeline.CommandMessage(&pipeline.Command{Tp: pipeline.CommandTypeStopAtTs, StoppedTs: 6}), nil))
pipeline.CommandMessage(&pipeline.Command{Tp: pipeline.CommandTypeStop}), nil))
c.Assert(cerrors.ErrTableProcessorStoppedSafely.Equal(err), check.IsTrue)
c.Assert(node.Status(), check.Equals, TableStatusStopped)

Expand All @@ -183,7 +198,7 @@ func (s *outputSuite) TestStatus(c *check.C) {
c.Assert(node.Status(), check.Equals, TableStatusRunning)

err = node.Receive(pipeline.MockNodeContext4Test(ctx,
pipeline.CommandMessage(&pipeline.Command{Tp: pipeline.CommandTypeStopAtTs, StoppedTs: 6}), nil))
pipeline.CommandMessage(&pipeline.Command{Tp: pipeline.CommandTypeStop}), nil))
c.Assert(cerrors.ErrTableProcessorStoppedSafely.Equal(err), check.IsTrue)
c.Assert(node.Status(), check.Equals, TableStatusStopped)

Expand All @@ -194,6 +209,44 @@ func (s *outputSuite) TestStatus(c *check.C) {
c.Assert(node.CheckpointTs(), check.Equals, uint64(7))
}

// TestStopStatus tests the table status of a pipeline is not set to stopped
// until the underlying sink is closed
func (s *outputSuite) TestStopStatus(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: "changefeed-id-test-status",
Info: &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Config: config.GetDefaultReplicaConfig(),
},
})

closeCh := make(chan interface{}, 1)
node := newSinkNode(&mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)
c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx,
pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, Row: &model.RowChangedEvent{}}), nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusRunning)

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// This will block until sink Close returns
err := node.Receive(pipeline.MockNodeContext4Test(ctx,
pipeline.CommandMessage(&pipeline.Command{Tp: pipeline.CommandTypeStop}), nil))
c.Assert(cerrors.ErrTableProcessorStoppedSafely.Equal(err), check.IsTrue)
c.Assert(node.Status(), check.Equals, TableStatusStopped)
}()
// wait to ensure stop message is sent to the sink node
time.Sleep(time.Millisecond * 50)
c.Assert(node.Status(), check.Equals, TableStatusRunning)
closeCh <- struct{}{}
wg.Wait()
}

func (s *outputSuite) TestManyTs(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
Expand Down
3 changes: 1 addition & 2 deletions cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ func (t *tablePipelineImpl) UpdateBarrierTs(ts model.Ts) {
// AsyncStop tells the pipeline to stop, and returns true is the pipeline is already stopped.
func (t *tablePipelineImpl) AsyncStop(targetTs model.Ts) bool {
err := t.p.SendToFirstNode(pipeline.CommandMessage(&pipeline.Command{
Tp: pipeline.CommandTypeStopAtTs,
StoppedTs: targetTs,
Tp: pipeline.CommandTypeStop,
}))
log.Info("send async stop signal to table", zap.Int64("tableID", t.tableID), zap.Uint64("targetTs", targetTs))
if err != nil {
Expand Down
7 changes: 3 additions & 4 deletions pkg/pipeline/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,11 @@ type CommandType int
const (
// CommandTypeUnknown is unknown message type
CommandTypeUnknown CommandType = iota
// CommandTypeStopAtTs means the table pipeline should stop at the specified Ts
CommandTypeStopAtTs
// CommandTypeStop means the table pipeline should stop at once
CommandTypeStop
)

// Command is the command about table pipeline
type Command struct {
Tp CommandType
StoppedTs model.Ts
Tp CommandType
}