Skip to content

Commit

Permalink
owner: fix resouce cleanup when an owner exits (pingcap#528)
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored May 8, 2020
1 parent df2565c commit fa54e00
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 56 deletions.
28 changes: 14 additions & 14 deletions cdc/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/mvcc"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
Expand Down Expand Up @@ -115,19 +117,16 @@ func (c *Capture) Run(ctx context.Context) (err error) {
var ev *TaskEvent
wch := taskWatcher.Watch(ctx)
for {
// Panic when the session is done unexpectedly, it means the
// Return error when the session is done unexpectedly, it means the
// server does not send heartbeats in time, or network interrupted
// In this case, the state of the capture is undermined,
// the task may have or have not been rebalanced, the owner
// may be or not be held.
// When a panic happens, the routine will immediately starts to unwind
// the call stack until the whole program crashes or the built-in recover
// function is called, we use recover in server stack and starts a new
// server main loop, and we cancel context here to let all sub routines exit
// In this case, the state of the capture is undermined, the task may
// have or have not been rebalanced, the owner may be or not be held,
// so we must cancel context to let all sub routines exit.
select {
case <-c.session.Done():
if ctx.Err() != context.Canceled {
c.Suicide()
log.Info("capture session done, capture suicide itself", zap.String("capture", c.info.ID))
return errors.Trace(ErrSuicide)
}
case ev = <-wch:
if ev == nil {
Expand All @@ -145,19 +144,20 @@ func (c *Capture) Run(ctx context.Context) (err error) {

// Campaign to be an owner
func (c *Capture) Campaign(ctx context.Context) error {
failpoint.Inject("capture-campaign-compacted-error", func() {
failpoint.Return(errors.Trace(mvcc.ErrCompacted))
})
return c.election.Campaign(ctx, c.info.ID)
}

// Resign lets a owner start a new election.
func (c *Capture) Resign(ctx context.Context) error {
failpoint.Inject("capture-resign-failed", func() {
failpoint.Return(errors.New("capture resign failed"))
})
return c.election.Resign(ctx)
}

// Suicide kills the capture itself
func (c *Capture) Suicide() {
panic(ErrSuicide)
}

// Cleanup cleans all dynamic resources
func (c *Capture) Cleanup() {
c.procLock.Lock()
Expand Down
4 changes: 4 additions & 0 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,10 @@ func (o *Owner) Close(ctx context.Context, stepDown func(ctx context.Context) er
// Run the owner
// TODO avoid this tick style, this means we get `tickTime` latency here.
func (o *Owner) Run(ctx context.Context, tickTime time.Duration) error {
failpoint.Inject("owner-run-with-error", func() {
failpoint.Return(errors.New("owner run with injected error"))
})

ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand Down
96 changes: 54 additions & 42 deletions cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/util"
"go.etcd.io/etcd/mvcc"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

const (
Expand Down Expand Up @@ -119,10 +121,50 @@ func (s *Server) Run(ctx context.Context) error {

// When a capture suicided, restart it
for {
if err := s.run(ctx); err != ErrSuicide {
if err := s.run(ctx); errors.Cause(err) != ErrSuicide {
return err
}
log.Info("server recovered")
log.Info("server recovered", zap.String("capture", s.capture.info.ID))
}
}

func (s *Server) campaignOwnerLoop(ctx context.Context) error {
// In most failure cases, we don't return error directly, just run another
// campaign loop. We treat campaign loop as a special background routine.
for {
// Campaign to be an owner, it blocks until it becomes the owner
if err := s.capture.Campaign(ctx); err != nil {
switch errors.Cause(err) {
case context.Canceled:
return nil
case mvcc.ErrCompacted:
continue
}
log.Warn("campaign owner failed", zap.Error(err))
continue
}
log.Info("campaign owner successfully", zap.String("capture", s.capture.info.ID))
owner, err := NewOwner(s.capture.session, s.opts.gcTTL)
if err != nil {
log.Warn("create new owner failed", zap.Error(err))
continue
}

s.owner = owner
if err := owner.Run(ctx, ownerRunInterval); err != nil {
if errors.Cause(err) == context.Canceled {
log.Info("owner exited", zap.String("capture", s.capture.info.ID))
return nil
}
err2 := s.capture.Resign(ctx)
if err2 != nil {
// if regisn owner failed, return error to let capture exits
return errors.Annotatef(err2, "resign owner failed, capture: %s", s.capture.info.ID)
}
log.Warn("run owner failed", zap.Error(err))
}
// owner is resigned by API, reset owner and continue the campaign loop
s.owner = nil
}
}

Expand All @@ -135,49 +177,19 @@ func (s *Server) run(ctx context.Context) (err error) {
ctx = util.PutCaptureIDInCtx(ctx, s.capture.info.ID)
ctx = util.PutTimezoneInCtx(ctx, s.opts.timezone)
ctx, cancel := context.WithCancel(ctx)

// when a goroutine paniced, cancel would be called first, which
// cancels all the normal goroutines, and then the defered recover
// is called, which modifies the err value to ErrSuicide. The caller
// would restart this function when an error is ErrSuicide.
defer func() {
if r := recover(); r == ErrSuicide {
log.Error("server suicided")
// assign the error value, which should be handled by
// the parent caller
err = ErrSuicide
} else if r != nil {
log.Error("server exited with panic", zap.Reflect("panic info", r))
}
}()
defer cancel()

go func() {
for {
// Campaign to be an owner, it blocks until it becomes
// the owner
if err := s.capture.Campaign(ctx); err != nil {
log.Error("campaign failed", zap.Error(err))
return
}
owner, err := NewOwner(s.capture.session, s.opts.gcTTL)
if err != nil {
log.Error("new owner failed", zap.Error(err))
return
}
s.owner = owner
if err := owner.Run(ctx, ownerRunInterval); err != nil {
if errors.Cause(err) != context.Canceled {
log.Error("run owner failed", zap.Error(err))
} else {
log.Info("owner exited")
}
return
}
}
wg, cctx := errgroup.WithContext(ctx)

wg.Go(func() error {
return s.campaignOwnerLoop(cctx)
})

wg.Go(func() error {
return s.capture.Run(cctx)
})

}()
return s.capture.Run(ctx)
return wg.Wait()
}

// Close closes the server.
Expand Down
41 changes: 41 additions & 0 deletions tests/availability/owner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ function test_owner_ha() {
test_kill_owner
test_hang_up_owner
test_expire_owner
test_owner_cleanup_stale_tasks
# FIXME: this test case should be owner crashed during task cleanup
# test_owner_cleanup_stale_tasks
test_owner_retryable_error
test_gap_between_watch_capture
}
# test_kill_owner starts two captures and kill the owner
Expand Down Expand Up @@ -150,6 +152,45 @@ function test_owner_cleanup_stale_tasks() {
ensure $MAX_RETRIES nonempty 'select id, val from test.availability1 where id=1 and val=22'
run_sql "DELETE from test.availability1 where id=1;"
ensure $MAX_RETRIES empty 'select id, val from test.availability1 where id=1'

echo "test_owner_cleanup_stale_tasks pass"
cleanup_process $CDC_BINARY
}

# test some retryable error meeting in the campaign owner loop
function test_owner_retryable_error() {
echo "run test case test_owner_retryable_error"

export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/capture-campaign-compacted-error=1*return(true)'

# start a capture server
run_cdc_server $WORK_DIR $CDC_BINARY server1
# ensure the server become the owner
ensure $MAX_RETRIES "$CDC_BINARY cli capture list 2>&1 | grep '\"is-owner\": true'"
owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}')
owner_id=$($CDC_BINARY cli capture list 2>&1 | awk -F '"' '/id/{print $4}')
echo "owner pid:" $owner_pid
echo "owner id" $owner_id

export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/owner-run-with-error=1*return(true);github.com/pingcap/ticdc/cdc/capture-resign-failed=1*return(true)'

# run another server
run_cdc_server $WORK_DIR $CDC_BINARY server2
ensure $MAX_RETRIES "$CDC_BINARY cli capture list 2>&1 | grep -v \"$owner_id\" | grep id"
capture_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}' | grep -v "$owner_pid")
capture_id=$($CDC_BINARY cli capture list 2>&1 | awk -F '"' '/id/{print $4}' | grep -v "$owner_id")
echo "capture_id:" $capture_id

# resign the first capture, the second capture campaigns to be owner.
# However we have injected two failpoints, the second capture owner runs
# with error and before it exits resign owner also failed, so the second
# capture will exit and the first capture campaigns to be owner again.
curl -X POST http://127.0.0.1:8300/capture/owner/resign
ensure $MAX_RETRIES "$CDC_BINARY cli capture list 2>&1 | grep $owner_id -A1 | grep '\"is-owner\": true'"
ensure $MAX_RETRIES "ps -C $CDC_BINARY -o pid= | awk '{print \$1}' | wc -l | grep 1"

echo "test_owner_retryable_error pass"
export GO_FAILPOINTS=''
cleanup_process $CDC_BINARY
}

Expand Down

0 comments on commit fa54e00

Please sign in to comment.