Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

leadership: check leadership key before exit watch loop #6979

Draft
wants to merge 13 commits into
base: master
Choose a base branch
from
Next Next commit
etcdutil, leadership: use RequestProgress in watch loop
Signed-off-by: lhy1024 <admin@liudos.us>
  • Loading branch information
lhy1024 committed Aug 16, 2023
commit e2c17b205bcaa2e881db78dd0a749da7b7c35a67
37 changes: 17 additions & 20 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (

const (
watchLoopUnhealthyTimeout = 60 * time.Second
detectHealthyInterval = 10 * time.Second
)

// GetLeader gets the corresponding leader from etcd by given leaderPath (as the key).
Expand Down Expand Up @@ -189,15 +188,9 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
return
}

interval := detectHealthyInterval
unhealthyTimeout := watchLoopUnhealthyTimeout
failpoint.Inject("fastTick", func() {
unhealthyTimeout = 5 * time.Second
interval = 1 * time.Second
})
ticker := time.NewTicker(interval)
ticker := time.NewTicker(etcdutil.RequestProgressInterval)
defer ticker.Stop()
lastHealthyTime := time.Now()
lastReceivedResponseTime := time.Now()

watcher := clientv3.NewWatcher(ls.client)
defer watcher.Close()
Expand All @@ -220,7 +213,7 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
// When etcd is not available, the watcher.Watch will block,
// so we check the etcd availability first.
if !etcdutil.IsHealthy(serverCtx, ls.client) {
if time.Since(lastHealthyTime) > unhealthyTimeout {
if time.Since(lastReceivedResponseTime) > watchLoopUnhealthyTimeout {
log.Error("the connect of leadership watcher is unhealthy",
zap.Int64("revision", revision),
zap.String("leader-key", ls.leaderKey),
Expand Down Expand Up @@ -249,23 +242,26 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
return
case <-ticker.C:
if !etcdutil.IsHealthy(serverCtx, ls.client) {
if time.Since(lastHealthyTime) > unhealthyTimeout {
log.Error("the connect of leadership watcher is unhealthy",
zap.Int64("revision", revision),
zap.String("leader-key", ls.leaderKey),
zap.String("purpose", ls.purpose))
return
}
goto WatchChanLoop
// When etcd is not available, the watcher.RequestProgress will block,
// so we check the etcd availability first.
continue
}
if err := watcher.RequestProgress(serverCtx); err != nil {
log.Warn("failed to request progress in watch loop", zap.Error(err))
}
if time.Since(lastReceivedResponseTime) >= etcdutil.WatchChTimeoutDuration {
// If no msg comes from an etcd watchChan for WatchChTimeoutDuration long,
// we should cancel the watchChan and request a new watchChan from watcher.
continue
}
case wresp := <-watchChan:
lastReceivedResponseTime = time.Now()
// meet compacted error, use the compact revision.
if wresp.CompactRevision != 0 {
log.Warn("required revision has been compacted, use the compact revision",
zap.Int64("required-revision", revision),
zap.Int64("compact-revision", wresp.CompactRevision))
revision = wresp.CompactRevision
lastHealthyTime = time.Now()
continue
} else if wresp.Err() != nil { // wresp.Err() contains CompactRevision not equal to 0
log.Error("leadership watcher is canceled with",
Expand All @@ -274,6 +270,8 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
zap.String("purpose", ls.purpose),
errs.ZapError(errs.ErrEtcdWatcherCancel, wresp.Err()))
return
} else if wresp.IsProgressNotify() {
goto WatchChanLoop
}

for _, ev := range wresp.Events {
Expand All @@ -287,7 +285,6 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
}
revision = wresp.Header.Revision + 1
}
lastHealthyTime = time.Now()
goto WatchChanLoop // use goto to avoid to create a new watchChan
}
}
Expand Down
24 changes: 23 additions & 1 deletion pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,11 @@ const (
defaultLoadBatchSize = 400
defaultWatchChangeRetryInterval = 1 * time.Second
defaultForceLoadMinimalInterval = 200 * time.Millisecond

// RequestProgressInterval is the interval to call RequestProgress for watcher.
RequestProgressInterval = 1 * time.Second
// WatchChTimeoutDuration is the timeout duration for a watchChan.
WatchChTimeoutDuration = DefaultRequestTimeout
)

// LoopWatcher loads data from etcd and sets a watcher for it.
Expand Down Expand Up @@ -678,6 +683,10 @@ func (lw *LoopWatcher) initFromEtcd(ctx context.Context) int64 {
}

func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision int64, err error) {
ticker := time.NewTicker(RequestProgressInterval)
defer ticker.Stop()
lastReceivedResponseTime := time.Now()

watcher := clientv3.NewWatcher(lw.client)
defer watcher.Close()
var watchChanCancel context.CancelFunc
Expand All @@ -686,6 +695,7 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
watchChanCancel()
}
}()

for {
if watchChanCancel != nil {
watchChanCancel()
Expand All @@ -700,6 +710,15 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
select {
case <-ctx.Done():
return revision, nil
case <-ticker.C:
if err := watcher.RequestProgress(ctx); err != nil {
log.Warn("failed to request progress in watch loop", zap.Error(err))
}
if time.Since(lastReceivedResponseTime) >= WatchChTimeoutDuration {
// If no msg comes from an etcd watchChan for WatchChTimeoutDuration long,
// we should cancel the watchChan and request a new watchChan from watcher.
continue
}
case <-lw.forceLoadCh:
revision, err = lw.load(ctx)
if err != nil {
Expand All @@ -708,6 +727,7 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
}
continue
case wresp := <-watchChan:
lastReceivedResponseTime = time.Now()
if wresp.CompactRevision != 0 {
log.Warn("required revision has been compacted, use the compact revision in watch loop",
zap.Int64("required-revision", revision),
Expand All @@ -719,6 +739,8 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
zap.Int64("revision", revision),
errs.ZapError(errs.ErrEtcdWatcherCancel, wresp.Err()))
return revision, wresp.Err()
} else if wresp.IsProgressNotify() {
goto WatchChanLoop
}
for _, event := range wresp.Events {
switch event.Type {
Expand Down Expand Up @@ -746,8 +768,8 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
zap.String("key", lw.key), zap.Error(err))
}
revision = wresp.Header.Revision + 1
goto WatchChanLoop // use goto to avoid to create a new watchChan
}
goto WatchChanLoop // use goto to avoid to create a new watchChan
}
}

Expand Down
Loading