Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

leadership: check leadership key before exit watch loop #6979

Draft
wants to merge 13 commits into
base: master
Choose a base branch
from
Prev Previous commit
Next Next commit
add watcher close
Signed-off-by: lhy1024 <admin@liudos.us>
  • Loading branch information
lhy1024 committed Aug 21, 2023
commit 1e1e7def394edea885267483e9f2c6957774ac86
13 changes: 11 additions & 2 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,17 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
return
}

var watcherCancel context.CancelFunc
var (
watcher clientv3.Watcher
watcherCancel context.CancelFunc
)
defer func() {
if watcherCancel != nil {
watcherCancel()
}
if watcher != nil {
watcher.Close()
}
}()
unhealthyTimeout := watchLoopUnhealthyTimeout
failpoint.Inject("fastTick", func() {
Expand All @@ -207,9 +213,12 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
if watcherCancel != nil {
watcherCancel()
}
if watcher != nil {
watcher.Close()
}
watcher = clientv3.NewWatcher(ls.client)
// In order to prevent a watch stream being stuck in a partitioned node,
// make sure to wrap context with "WithRequireLeader".
watcher := clientv3.NewWatcher(ls.client)
watcherCtx, cancel := context.WithCancel(clientv3.WithRequireLeader(serverCtx))
watcherCancel = cancel

Expand Down
15 changes: 12 additions & 3 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,11 +683,17 @@ func (lw *LoopWatcher) initFromEtcd(ctx context.Context) int64 {
}

func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision int64, err error) {
var watcherCancel context.CancelFunc
var (
watcher clientv3.Watcher
watcherCancel context.CancelFunc
)
defer func() {
if watcherCancel != nil {
watcherCancel()
}
if watcher != nil {
watcher.Close()
}
}()
ticker := time.NewTicker(RequestProgressInterval)
defer ticker.Stop()
Expand All @@ -697,12 +703,15 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
if watcherCancel != nil {
watcherCancel()
}
if watcher != nil {
watcher.Close()
}
watcher = clientv3.NewWatcher(lw.client)
// In order to prevent a watch stream being stuck in a partitioned node,
// make sure to wrap context with "WithRequireLeader".
watcher := clientv3.NewWatcher(lw.client)
watcherCtx, cancel := context.WithCancel(clientv3.WithRequireLeader(ctx))
watcherCancel = cancel
opts := append(lw.opts, clientv3.WithRev(revision))
opts := append(lw.opts, clientv3.WithRev(revision), clientv3.WithProgressNotify())
watchChan := watcher.Watch(watcherCtx, lw.key, opts...)
WatchChanLoop:
select {
Expand Down
Loading