Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

owner: fix memory accumulated when owner consume etcd update slow (#1225) #1228

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/ticdc/pkg/cyclic/mark"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/ticdc/pkg/notify"
"github.com/pingcap/ticdc/pkg/scheduler"
"github.com/pingcap/ticdc/pkg/security"
"github.com/pingcap/ticdc/pkg/util"
Expand Down Expand Up @@ -81,6 +82,7 @@ type Owner struct {
// record last time that flushes all changefeeds' replication status
lastFlushChangefeeds time.Time
flushChangefeedInterval time.Duration
feedChangeNotifier *notify.Notifier
}

const (
Expand Down Expand Up @@ -118,6 +120,7 @@ func NewOwner(
etcdClient: cli,
gcTTL: gcTTL,
flushChangefeedInterval: flushChangefeedInterval,
feedChangeNotifier: new(notify.Notifier),
}

return owner, nil
Expand Down Expand Up @@ -1015,12 +1018,13 @@ func (o *Owner) Run(ctx context.Context, tickTime time.Duration) error {

ctx1, cancel1 := context.WithCancel(ctx)
defer cancel1()
changedFeeds := o.watchFeedChange(ctx1)

ticker := time.NewTicker(tickTime)
defer ticker.Stop()
feedChangeReceiver, err := o.feedChangeNotifier.NewReceiver(tickTime)
if err != nil {
return err
}
defer feedChangeReceiver.Stop()
o.watchFeedChange(ctx1)

var err error
loop:
for {
select {
Expand All @@ -1032,8 +1036,7 @@ loop:
// Anyway we just break loop here to ensure the following destruction.
err = ctx.Err()
break loop
case <-changedFeeds:
case <-ticker.C:
case <-feedChangeReceiver.C:
}

err = o.run(ctx)
Expand Down Expand Up @@ -1087,16 +1090,16 @@ restart:
return nil
}

func (o *Owner) watchFeedChange(ctx context.Context) chan struct{} {
output := make(chan struct{}, 1)
func (o *Owner) watchFeedChange(ctx context.Context) {
go func() {
for {
select {
case <-ctx.Done():
return
default:
}
wch := o.etcdClient.Client.Watch(ctx, kv.TaskPositionKeyPrefix, clientv3.WithFilterDelete(), clientv3.WithPrefix())
cctx, cancel := context.WithCancel(ctx)
wch := o.etcdClient.Client.Watch(cctx, kv.TaskPositionKeyPrefix, clientv3.WithFilterDelete(), clientv3.WithPrefix())

for resp := range wch {
if resp.Err() != nil {
Expand All @@ -1108,14 +1111,11 @@ func (o *Owner) watchFeedChange(ctx context.Context) chan struct{} {
// majority logical. For now just to wakeup the main loop ASAP to reduce latency, the efficiency of etcd
// operations should be resolved in future release.

select {
case <-ctx.Done():
case output <- struct{}{}:
}
o.feedChangeNotifier.Notify()
}
cancel()
}
}()
return output
}

func (o *Owner) run(ctx context.Context) error {
Expand Down
90 changes: 90 additions & 0 deletions cdc/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,3 +928,93 @@ func (s *ownerSuite) TestCleanUpStaleTasks(c *check.C) {
err = capture.etcdClient.Close()
c.Assert(err, check.IsNil)
}

func (s *ownerSuite) TestWatchFeedChange(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
addr := "127.0.0.1:12034"
ctx = util.PutCaptureAddrInCtx(ctx, addr)
capture, err := NewCapture(ctx, []string{s.clientURL.String()}, nil,
&security.Credential{}, addr, &processorOpts{})
c.Assert(err, check.IsNil)
owner, err := NewOwner(ctx, nil, &security.Credential{}, capture.session,
DefaultCDCGCSafePointTTL, time.Millisecond*200)
c.Assert(err, check.IsNil)

var (
wg sync.WaitGroup
updateCount = 0
recvChangeCount = 0
)
ctx1, cancel1 := context.WithCancel(ctx)
wg.Add(1)
go func() {
defer wg.Done()
changefeedID := "test-changefeed"
pos := &model.TaskPosition{CheckPointTs: 100, ResolvedTs: 102}
for {
select {
case <-ctx1.Done():
return
default:
}
pos.ResolvedTs++
pos.CheckPointTs++
updated, err := capture.etcdClient.PutTaskPositionOnChange(ctx1, changefeedID, capture.info.ID, pos)
if errors.Cause(err) == context.Canceled {
return
}
c.Assert(err, check.IsNil)
c.Assert(updated, check.IsTrue)
updateCount++
// sleep to avoid other goroutine starvation
time.Sleep(time.Millisecond)
}
}()

feedChangeReceiver, err := owner.feedChangeNotifier.NewReceiver(ownerRunInterval)
c.Assert(err, check.IsNil)
defer feedChangeReceiver.Stop()
owner.watchFeedChange(ctx)
wg.Add(1)
go func() {
defer func() {
// there could be one message remaining in notification receiver, try to consume it
select {
case <-feedChangeReceiver.C:
default:
}
wg.Done()
}()
for {
select {
case <-ctx1.Done():
return
case <-feedChangeReceiver.C:
recvChangeCount++
// sleep to simulate some owner work
time.Sleep(time.Millisecond * 50)
}
}
}()

time.Sleep(time.Second * 2)
// use cancel1 to avoid cancel the watchFeedChange
cancel1()
wg.Wait()
c.Assert(recvChangeCount, check.Greater, 0)
c.Assert(recvChangeCount, check.Less, updateCount)
select {
case <-feedChangeReceiver.C:
c.Error("should not receive message from feed change chan any more")
default:
}

err = capture.etcdClient.Close()
if err != nil {
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
}
}