Skip to content

Commit

Permalink
owner: fix memory accumulated when owner consume etcd update slow (#1225
Browse files Browse the repository at this point in the history
) (#1228)

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
ti-srebot authored Dec 22, 2020
1 parent eed57c9 commit caf4db8
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 15 deletions.
30 changes: 15 additions & 15 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/ticdc/pkg/cyclic/mark"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/ticdc/pkg/notify"
"github.com/pingcap/ticdc/pkg/scheduler"
"github.com/pingcap/ticdc/pkg/security"
"github.com/pingcap/ticdc/pkg/util"
Expand Down Expand Up @@ -81,6 +82,7 @@ type Owner struct {
// record last time that flushes all changefeeds' replication status
lastFlushChangefeeds time.Time
flushChangefeedInterval time.Duration
feedChangeNotifier *notify.Notifier
}

const (
Expand Down Expand Up @@ -118,6 +120,7 @@ func NewOwner(
etcdClient: cli,
gcTTL: gcTTL,
flushChangefeedInterval: flushChangefeedInterval,
feedChangeNotifier: new(notify.Notifier),
}

return owner, nil
Expand Down Expand Up @@ -1015,12 +1018,13 @@ func (o *Owner) Run(ctx context.Context, tickTime time.Duration) error {

ctx1, cancel1 := context.WithCancel(ctx)
defer cancel1()
changedFeeds := o.watchFeedChange(ctx1)

ticker := time.NewTicker(tickTime)
defer ticker.Stop()
feedChangeReceiver, err := o.feedChangeNotifier.NewReceiver(tickTime)
if err != nil {
return err
}
defer feedChangeReceiver.Stop()
o.watchFeedChange(ctx1)

var err error
loop:
for {
select {
Expand All @@ -1032,8 +1036,7 @@ loop:
// Anyway we just break loop here to ensure the following destruction.
err = ctx.Err()
break loop
case <-changedFeeds:
case <-ticker.C:
case <-feedChangeReceiver.C:
}

err = o.run(ctx)
Expand Down Expand Up @@ -1087,16 +1090,16 @@ restart:
return nil
}

func (o *Owner) watchFeedChange(ctx context.Context) chan struct{} {
output := make(chan struct{}, 1)
func (o *Owner) watchFeedChange(ctx context.Context) {
go func() {
for {
select {
case <-ctx.Done():
return
default:
}
wch := o.etcdClient.Client.Watch(ctx, kv.TaskPositionKeyPrefix, clientv3.WithFilterDelete(), clientv3.WithPrefix())
cctx, cancel := context.WithCancel(ctx)
wch := o.etcdClient.Client.Watch(cctx, kv.TaskPositionKeyPrefix, clientv3.WithFilterDelete(), clientv3.WithPrefix())

for resp := range wch {
if resp.Err() != nil {
Expand All @@ -1108,14 +1111,11 @@ func (o *Owner) watchFeedChange(ctx context.Context) chan struct{} {
// majority logical. For now just to wakeup the main loop ASAP to reduce latency, the efficiency of etcd
// operations should be resolved in future release.

select {
case <-ctx.Done():
case output <- struct{}{}:
}
o.feedChangeNotifier.Notify()
}
cancel()
}
}()
return output
}

func (o *Owner) run(ctx context.Context) error {
Expand Down
90 changes: 90 additions & 0 deletions cdc/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,3 +928,93 @@ func (s *ownerSuite) TestCleanUpStaleTasks(c *check.C) {
err = capture.etcdClient.Close()
c.Assert(err, check.IsNil)
}

func (s *ownerSuite) TestWatchFeedChange(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
addr := "127.0.0.1:12034"
ctx = util.PutCaptureAddrInCtx(ctx, addr)
capture, err := NewCapture(ctx, []string{s.clientURL.String()}, nil,
&security.Credential{}, addr, &processorOpts{})
c.Assert(err, check.IsNil)
owner, err := NewOwner(ctx, nil, &security.Credential{}, capture.session,
DefaultCDCGCSafePointTTL, time.Millisecond*200)
c.Assert(err, check.IsNil)

var (
wg sync.WaitGroup
updateCount = 0
recvChangeCount = 0
)
ctx1, cancel1 := context.WithCancel(ctx)
wg.Add(1)
go func() {
defer wg.Done()
changefeedID := "test-changefeed"
pos := &model.TaskPosition{CheckPointTs: 100, ResolvedTs: 102}
for {
select {
case <-ctx1.Done():
return
default:
}
pos.ResolvedTs++
pos.CheckPointTs++
updated, err := capture.etcdClient.PutTaskPositionOnChange(ctx1, changefeedID, capture.info.ID, pos)
if errors.Cause(err) == context.Canceled {
return
}
c.Assert(err, check.IsNil)
c.Assert(updated, check.IsTrue)
updateCount++
// sleep to avoid other goroutine starvation
time.Sleep(time.Millisecond)
}
}()

feedChangeReceiver, err := owner.feedChangeNotifier.NewReceiver(ownerRunInterval)
c.Assert(err, check.IsNil)
defer feedChangeReceiver.Stop()
owner.watchFeedChange(ctx)
wg.Add(1)
go func() {
defer func() {
// there could be one message remaining in notification receiver, try to consume it
select {
case <-feedChangeReceiver.C:
default:
}
wg.Done()
}()
for {
select {
case <-ctx1.Done():
return
case <-feedChangeReceiver.C:
recvChangeCount++
// sleep to simulate some owner work
time.Sleep(time.Millisecond * 50)
}
}
}()

time.Sleep(time.Second * 2)
// use cancel1 to avoid cancel the watchFeedChange
cancel1()
wg.Wait()
c.Assert(recvChangeCount, check.Greater, 0)
c.Assert(recvChangeCount, check.Less, updateCount)
select {
case <-feedChangeReceiver.C:
c.Error("should not receive message from feed change chan any more")
default:
}

err = capture.etcdClient.Close()
if err != nil {
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
}
}

0 comments on commit caf4db8

Please sign in to comment.