Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink: add barrier in underlying sink to support accruate stop #2417

Merged
merged 14 commits into from
Aug 10, 2021
6 changes: 5 additions & 1 deletion cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -1009,8 +1009,12 @@ func (c *changeFeed) Close() {
}
}

ctx, cancel := context.WithCancel(context.Background())
cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some comments here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

if c.sink != nil {
err := c.sink.Close()
// pass a canceled context is enough, since the Close of backend sink
// here doesn't use context actually.
err := c.sink.Close(ctx)
if err != nil && errors.Cause(err) != context.Canceled {
log.Warn("failed to close owner sink", zap.Error(err))
}
Expand Down
4 changes: 3 additions & 1 deletion cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,9 @@ func (o *Owner) newChangeFeed(
}
defer func() {
if resultErr != nil && primarySink != nil {
primarySink.Close()
// The Close of backend sink here doesn't use context, it is ok to pass
// a canceled context here.
primarySink.Close(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to pass in a cancelled context here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comment

}
}()
go func() {
Expand Down
6 changes: 3 additions & 3 deletions cdc/owner/async_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type AsyncSink interface {
// the caller of this function can call again and again until a true returned
EmitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent) (bool, error)
SinkSyncpoint(ctx cdcContext.Context, checkpointTs uint64) error
Close() error
Close(ctx context.Context) error
}

type asyncSinkImpl struct {
Expand Down Expand Up @@ -181,9 +181,9 @@ func (s *asyncSinkImpl) SinkSyncpoint(ctx cdcContext.Context, checkpointTs uint6
return s.syncpointStore.SinkSyncpoint(ctx, ctx.ChangefeedVars().ID, checkpointTs)
}

func (s *asyncSinkImpl) Close() (err error) {
func (s *asyncSinkImpl) Close(ctx context.Context) (err error) {
s.cancel()
err = s.sink.Close()
err = s.sink.Close(ctx)
if s.syncpointStore != nil {
err = s.syncpointStore.Close()
}
Expand Down
14 changes: 9 additions & 5 deletions cdc/owner/async_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ func (m *mockSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
return m.ddlError
}

func (m *mockSink) Close() error {
func (m *mockSink) Close(ctx context.Context) error {
return nil
}

func (m *mockSink) Barrier(ctx context.Context) error {
return nil
}

Expand All @@ -88,7 +92,7 @@ func (s *asyncSinkSuite) TestInitialize(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(false)
ctx, sink, mockSink := newAsyncSink4Test(ctx, c)
defer sink.Close()
defer sink.Close(ctx)
tableInfos := []*model.SimpleTableInfo{{Schema: "test"}}
err := sink.Initialize(ctx, tableInfos)
c.Assert(err, check.IsNil)
Expand All @@ -99,7 +103,7 @@ func (s *asyncSinkSuite) TestCheckpoint(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(false)
ctx, sink, mSink := newAsyncSink4Test(ctx, c)
defer sink.Close()
defer sink.Close(ctx)

waitCheckpointGrowingUp := func(m *mockSink, targetTs model.Ts) error {
return retry.Do(context.Background(), func() error {
Expand All @@ -119,7 +123,7 @@ func (s *asyncSinkSuite) TestExecDDL(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(false)
ctx, sink, mSink := newAsyncSink4Test(ctx, c)
defer sink.Close()
defer sink.Close(ctx)
ddl1 := &model.DDLEvent{CommitTs: 1}
for {
done, err := sink.EmitDDLEvent(ctx, ddl1)
Expand Down Expand Up @@ -170,7 +174,7 @@ func (s *asyncSinkSuite) TestExecDDLError(c *check.C) {
return nil
})
ctx, sink, mSink := newAsyncSink4Test(ctx, c)
defer sink.Close()
defer sink.Close(ctx)
mSink.ddlError = cerror.ErrDDLEventIgnored.GenWithStackByArgs()
ddl1 := &model.DDLEvent{CommitTs: 1}
for {
Expand Down
5 changes: 4 additions & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,10 @@ func (c *changefeed) releaseResources() {
c.cancel = func() {}
c.ddlPuller.Close()
c.schema = nil
if err := c.sink.Close(); err != nil {
ctx, cancel := context.WithCancel(context.Background())
cancel()
// We don't need to wait sink Close, pass a canceled context is ok
if err := c.sink.Close(ctx); err != nil {
log.Warn("Closing sink failed in Owner", zap.String("changefeedID", c.state.ID), zap.Error(err))
}
c.wg.Wait()
Expand Down
6 changes: 5 additions & 1 deletion cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ func (m *mockAsyncSink) EmitCheckpointTs(ctx cdcContext.Context, ts uint64) {
atomic.StoreUint64(&m.checkpointTs, ts)
}

func (m *mockAsyncSink) Close() error {
func (m *mockAsyncSink) Close(ctx context.Context) error {
return nil
}

func (m *mockAsyncSink) Barrier(ctx context.Context) error {
return nil
}

Expand Down
8 changes: 6 additions & 2 deletions cdc/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,11 @@ func (m *mockSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
return m.checkpointError
}

func (m *mockSink) Close() error {
func (m *mockSink) Close(ctx context.Context) error {
return nil
}

func (m *mockSink) Barrier(ctx context.Context) error {
return nil
}

Expand Down Expand Up @@ -902,7 +906,7 @@ func (s *ownerSuite) TestHandleAdmin(c *check.C) {
errCh := make(chan error, 1)
sink, err := sink.NewSink(ctx, cfID, "blackhole://", f, replicaConf, map[string]string{}, errCh)
c.Assert(err, check.IsNil)
defer sink.Close() //nolint:errcheck
defer sink.Close(cctx) //nolint:errcheck
sampleCF.sink = sink

capture, err := NewCapture(ctx, []string{s.clientURL.String()}, nil, nil)
Expand Down
6 changes: 3 additions & 3 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,10 +899,10 @@ func (p *oldProcessor) addTable(ctx context.Context, tableID int64, replicaInfo
table.cancel = func() {
cancel()
if tableSink != nil {
tableSink.Close()
tableSink.Close(ctx)
}
if mTableSink != nil {
mTableSink.Close()
mTableSink.Close(ctx)
}
}
syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr).Inc()
Expand Down Expand Up @@ -1191,7 +1191,7 @@ func (p *oldProcessor) stop(ctx context.Context) error {
if err := p.etcdCli.DeleteTaskWorkload(ctx, p.changefeedID, p.captureInfo.ID); err != nil {
return err
}
return p.sinkManager.Close()
return p.sinkManager.Close(ctx)
}

func (p *oldProcessor) isStopped() bool {
Expand Down
28 changes: 19 additions & 9 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,27 @@ func (n *sinkNode) Init(ctx pipeline.NodeContext) error {
return nil
}

// stop is called when sink receives a stop command or checkpointTs reaches targetTs.
// In this method, the builtin table sink will be closed by calling `Close`, and
// no more events can be sent to this sink node afterwards.
func (n *sinkNode) stop(ctx pipeline.NodeContext) (err error) {
n.status.Store(TableStatusStopped)
err = n.sink.Close(ctx)
if err != nil {
return
}
err = cerror.ErrTableProcessorStoppedSafely.GenWithStackByArgs()
return
}

func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err error) {
defer func() {
if err != nil {
n.status.Store(TableStatusStopped)
return
}
if n.checkpointTs >= n.targetTs {
n.status.Store(TableStatusStopped)
err = n.sink.Close()
if err != nil {
err = errors.Trace(err)
return
}
err = cerror.ErrTableProcessorStoppedSafely.GenWithStackByArgs()
err = n.stop(ctx)
}
}()
if resolvedTs > n.barrierTs {
Expand Down Expand Up @@ -270,6 +277,9 @@ func (n *sinkNode) flushRow2Sink(ctx pipeline.NodeContext) error {

// Receive receives the message from the previous node
func (n *sinkNode) Receive(ctx pipeline.NodeContext) error {
if n.status == TableStatusStopped {
return cerror.ErrTableProcessorStoppedSafely.GenWithStackByArgs()
}
msg := ctx.Message()
switch msg.Tp {
case pipeline.MessageTypePolymorphicEvent:
Expand Down Expand Up @@ -301,7 +311,7 @@ func (n *sinkNode) Receive(ctx pipeline.NodeContext) error {
"the table pipeline can't be stopped accurately, will be stopped soon",
zap.Uint64("stoppedTs", msg.Command.StoppedTs), zap.Uint64("checkpointTs", n.checkpointTs))
}
n.targetTs = msg.Command.StoppedTs
return n.stop(ctx)
}
case pipeline.MessageTypeBarrier:
n.barrierTs = msg.BarrierTs
Expand All @@ -315,5 +325,5 @@ func (n *sinkNode) Receive(ctx pipeline.NodeContext) error {
func (n *sinkNode) Destroy(ctx pipeline.NodeContext) error {
n.status.Store(TableStatusStopped)
n.flowController.Abort()
return n.sink.Close()
return n.sink.Close(ctx)
}
22 changes: 14 additions & 8 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ func (s *mockSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
panic("unreachable")
}

func (s *mockSink) Close() error {
func (s *mockSink) Close(ctx context.Context) error {
return nil
}

func (s *mockSink) Barrier(ctx context.Context) error {
return nil
}

Expand Down Expand Up @@ -155,15 +159,16 @@ func (s *outputSuite) TestStatus(c *check.C) {
pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, Row: &model.RowChangedEvent{}}), nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusRunning)

c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx,
pipeline.CommandMessage(&pipeline.Command{Tp: pipeline.CommandTypeStopAtTs, StoppedTs: 6}), nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusRunning)
err = node.Receive(pipeline.MockNodeContext4Test(ctx,
pipeline.CommandMessage(&pipeline.Command{Tp: pipeline.CommandTypeStopAtTs, StoppedTs: 6}), nil))
c.Assert(cerrors.ErrTableProcessorStoppedSafely.Equal(err), check.IsTrue)
c.Assert(node.Status(), check.Equals, TableStatusStopped)

err = node.Receive(pipeline.MockNodeContext4Test(ctx,
pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, Row: &model.RowChangedEvent{}}), nil))
c.Assert(cerrors.ErrTableProcessorStoppedSafely.Equal(err), check.IsTrue)
c.Assert(node.Status(), check.Equals, TableStatusStopped)
c.Assert(node.CheckpointTs(), check.Equals, uint64(6))
c.Assert(node.CheckpointTs(), check.Equals, uint64(2))

// test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts
node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
Expand All @@ -177,9 +182,10 @@ func (s *outputSuite) TestStatus(c *check.C) {
pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, Row: &model.RowChangedEvent{}}), nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusRunning)

c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx,
pipeline.CommandMessage(&pipeline.Command{Tp: pipeline.CommandTypeStopAtTs, StoppedTs: 6}), nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusRunning)
err = node.Receive(pipeline.MockNodeContext4Test(ctx,
pipeline.CommandMessage(&pipeline.Command{Tp: pipeline.CommandTypeStopAtTs, StoppedTs: 6}), nil))
c.Assert(cerrors.ErrTableProcessorStoppedSafely.Equal(err), check.IsTrue)
c.Assert(node.Status(), check.Equals, TableStatusStopped)

err = node.Receive(pipeline.MockNodeContext4Test(ctx,
pipeline.PolymorphicEventMessage(&model.PolymorphicEvent{CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, Row: &model.RowChangedEvent{}}), nil))
Expand Down
5 changes: 4 additions & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,10 @@ func (p *processor) Close() error {
syncTableNumGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
processorErrorCounter.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
if p.sinkManager != nil {
return p.sinkManager.Close()
// pass a canceled context is ok here, since we don't need to wait Close
ctx, cancel := context.WithCancel(context.Background())
cancel()
return p.sinkManager.Close(ctx)
}
return nil
}
Expand Down
6 changes: 5 additions & 1 deletion cdc/sink/black_hole.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ func (b *blackHoleSink) Initialize(ctx context.Context, tableInfo []*model.Simpl
return nil
}

func (b *blackHoleSink) Close() error {
func (b *blackHoleSink) Close(ctx context.Context) error {
return nil
}

func (b *blackHoleSink) Barrier(ctx context.Context) error {
return nil
}
8 changes: 7 additions & 1 deletion cdc/sink/cdclog/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,13 @@ func (f *fileSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTabl
return nil
}

func (f *fileSink) Close() error {
func (f *fileSink) Close(ctx context.Context) error {
return nil
}

func (f *fileSink) Barrier(ctx context.Context) error {
// Barrier does nothing because FlushRowChangedEvents in file sink has flushed
// all buffered events forcedlly.
return nil
}

Expand Down
8 changes: 7 additions & 1 deletion cdc/sink/cdclog/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,13 @@ func (s *s3Sink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableI
return nil
}

func (s *s3Sink) Close() error {
func (s *s3Sink) Close(ctx context.Context) error {
return nil
}

func (s *s3Sink) Barrier(ctx context.Context) error {
// Barrier does nothing because FlushRowChangedEvents in s3 sink has flushed
// all buffered events forcedlly.
return nil
}

Expand Down
Loading