Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink/mysql: adjust producer/consumer exit sequence to avoid goroutine leak #1929

Merged
merged 6 commits into from
Jun 5, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions cdc/sink/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ import (
"github.com/pingcap/errors"

"github.com/pingcap/check"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/util/testleak"
"go.uber.org/zap"
)

type managerSuite struct{}
Expand All @@ -48,9 +46,6 @@ func (c *checkSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTab
func (c *checkSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
c.rowsMu.Lock()
defer c.rowsMu.Unlock()
for _, row := range rows {
log.Info("rows in check sink", zap.Reflect("row", row))
}
c.rows = append(c.rows, rows...)
return nil
}
Expand All @@ -62,7 +57,6 @@ func (c *checkSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
func (c *checkSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
c.rowsMu.Lock()
defer c.rowsMu.Unlock()
log.Info("flush in check sink", zap.Uint64("resolved", resolvedTs))
var newRows []*model.RowChangedEvent
for _, row := range c.rows {
c.Assert(row.CommitTs, check.Greater, c.lastResolvedTs)
Expand Down
27 changes: 27 additions & 0 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64
}

func (s *mysqlSink) flushRowChangedEvents(ctx context.Context, receiver *notify.Receiver) {
defer func() {
for _, worker := range s.workers {
worker.closedCh <- struct{}{}
zier-one marked this conversation as resolved.
Show resolved Hide resolved
}
}()
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -640,6 +645,7 @@ func (s *mysqlSink) createSinkWorkers(ctx context.Context) error {
log.Info("mysql sink receives redundant error", zap.Error(err))
}
}
worker.cleanup()
}()
}
return nil
Expand Down Expand Up @@ -712,6 +718,7 @@ type mysqlSinkWorker struct {
metricBucketSize prometheus.Counter
receiver *notify.Receiver
checkpointTs uint64
closedCh chan struct{}
}

func newMySQLSinkWorker(
Expand All @@ -728,6 +735,7 @@ func newMySQLSinkWorker(
metricBucketSize: metricBucketSize,
execDMLs: execDMLs,
receiver: receiver,
closedCh: make(chan struct{}, 1),
}
}

Expand Down Expand Up @@ -833,6 +841,25 @@ func (w *mysqlSinkWorker) run(ctx context.Context) (err error) {
}
}

// cleanup waits for notification from closedCh and consumes all txns from txnCh.
// The exit sequence is
// 1. producer(sink.flushRowChangedEvents goroutine) of txnCh exits
// 2. goroutine in 1 sends notification to closedCh of each sink worker
// 3. each sink worker receives the notification from closedCh and mark FinishWg as Done
func (w *mysqlSinkWorker) cleanup() {
<-w.closedCh
for {
select {
case txn := <-w.txnCh:
if txn.FinishWg != nil {
txn.FinishWg.Done()
}
default:
return
}
}
}

func (s *mysqlSink) Close() error {
s.execWaitNotifier.Close()
s.resolvedNotifier.Close()
Expand Down
73 changes: 73 additions & 0 deletions cdc/sink/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ func (s MySQLSinkSuite) TestMySQLSinkWorkerExitWithError(c *check.C) {
for _, txn := range txns1 {
w.appendTxn(cctx, txn)
}

// simulate notify sink worker to flush existing txns
var wg sync.WaitGroup
w.appendFinishTxn(&wg)
time.Sleep(time.Millisecond * 100)
Expand All @@ -280,7 +282,78 @@ func (s MySQLSinkSuite) TestMySQLSinkWorkerExitWithError(c *check.C) {
w.appendTxn(cctx, txn)
}
notifier.Notify()

// simulate sink shutdown and send closed singal to sink worker
w.closedCh <- struct{}{}
w.cleanup()

// the flush notification wait group should be done
wg.Wait()

cancel()
c.Assert(errg.Wait(), check.Equals, errExecFailed)
}

func (s MySQLSinkSuite) TestMySQLSinkWorkerExitCleanup(c *check.C) {
defer testleak.AfterTest(c)()
txns1 := []*model.SingleTableTxn{
{
CommitTs: 1,
Rows: []*model.RowChangedEvent{{CommitTs: 1}},
},
{
CommitTs: 2,
Rows: []*model.RowChangedEvent{{CommitTs: 2}},
},
}
txns2 := []*model.SingleTableTxn{
{
CommitTs: 5,
Rows: []*model.RowChangedEvent{{CommitTs: 5}},
},
}

maxTxnRow := 1
ctx := context.Background()

errExecFailed := errors.New("sink worker exec failed")
notifier := new(notify.Notifier)
cctx, cancel := context.WithCancel(ctx)
receiver, err := notifier.NewReceiver(-1)
c.Assert(err, check.IsNil)
w := newMySQLSinkWorker(maxTxnRow, 1, /*bucket*/
bucketSizeCounter.WithLabelValues("capture", "changefeed", "1"),
receiver,
func(ctx context.Context, events []*model.RowChangedEvent, replicaID uint64, bucket int) error {
return errExecFailed
})
errg, cctx := errgroup.WithContext(cctx)
errg.Go(func() error {
err := w.run(cctx)
return err
})
for _, txn := range txns1 {
w.appendTxn(cctx, txn)
}

// sleep to let txns flushed by tick
time.Sleep(time.Millisecond * 100)

// simulate more txns are sent to txnCh after the sink worker run has exited
for _, txn := range txns2 {
w.appendTxn(cctx, txn)
}
var wg sync.WaitGroup
w.appendFinishTxn(&wg)
notifier.Notify()

// simulate sink shutdown and send closed singal to sink worker
w.closedCh <- struct{}{}
w.cleanup()

// the flush notification wait group should be done
wg.Wait()

cancel()
c.Assert(errg.Wait(), check.Equals, errExecFailed)
}
Expand Down