Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

leadership: check leadership key before exit watch loop #6979

Draft
wants to merge 13 commits into
base: master
Choose a base branch
from
Prev Previous commit
Next Next commit
fix test
Signed-off-by: lhy1024 <admin@liudos.us>
  • Loading branch information
lhy1024 committed Aug 21, 2023
commit 1402111db599b872e584bff2e2583fe776739353
16 changes: 13 additions & 3 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
return
}

var watcherCancel context.CancelFunc
defer func() {
if watcherCancel != nil {
watcherCancel()
}
}()
unhealthyTimeout := watchLoopUnhealthyTimeout
failpoint.Inject("fastTick", func() {
unhealthyTimeout = 5 * time.Second
Expand All @@ -198,7 +204,14 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {

for {
failpoint.Inject("delayWatcher", nil)
if watcherCancel != nil {
watcherCancel()
}
// In order to prevent a watch stream being stuck in a partitioned node,
// make sure to wrap context with "WithRequireLeader".
watcher := clientv3.NewWatcher(ls.client)
watcherCtx, cancel := context.WithCancel(clientv3.WithRequireLeader(serverCtx))
watcherCancel = cancel

// When etcd is not available, the watcher.Watch will block,
// so we check the etcd availability first.
Expand All @@ -222,9 +235,6 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
}
}

// In order to prevent a watch stream being stuck in a partitioned node,
// make sure to wrap context with "WithRequireLeader".
watcherCtx := clientv3.WithRequireLeader(serverCtx)
watchChan := watcher.Watch(watcherCtx, ls.leaderKey,
clientv3.WithRev(revision), clientv3.WithProgressNotify())
WatchChanLoop:
Expand Down
12 changes: 11 additions & 1 deletion pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,15 +683,25 @@ func (lw *LoopWatcher) initFromEtcd(ctx context.Context) int64 {
}

func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision int64, err error) {
var watcherCancel context.CancelFunc
defer func() {
if watcherCancel != nil {
watcherCancel()
}
}()
ticker := time.NewTicker(RequestProgressInterval)
defer ticker.Stop()
lastReceivedResponseTime := time.Now()

for {
if watcherCancel != nil {
watcherCancel()
}
// In order to prevent a watch stream being stuck in a partitioned node,
// make sure to wrap context with "WithRequireLeader".
watcher := clientv3.NewWatcher(lw.client)
watcherCtx := clientv3.WithRequireLeader(ctx)
watcherCtx, cancel := context.WithCancel(clientv3.WithRequireLeader(ctx))
watcherCancel = cancel
opts := append(lw.opts, clientv3.WithRev(revision))
watchChan := watcher.Watch(watcherCtx, lw.key, opts...)
WatchChanLoop:
Expand Down
Loading