Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

leadership: check leadership key before exit watch loop #6979

Draft
wants to merge 13 commits into
base: master
Choose a base branch
from
Prev Previous commit
Next Next commit
add unit test
Signed-off-by: lhy1024 <admin@liudos.us>
  • Loading branch information
lhy1024 committed Aug 22, 2023
commit bdb3f4755ec8ec7a439af40d4018c0c73dd00f92
45 changes: 27 additions & 18 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,14 @@
return
}

unhealthyTimeout := watchLoopUnhealthyTimeout
failpoint.Inject("fastTick", func() {
unhealthyTimeout = 5 * time.Second
})
ticker := time.NewTicker(etcdutil.RequestProgressInterval)
defer ticker.Stop()
lastReceivedResponseTime := time.Now()

var (
watcher clientv3.Watcher
watcherCancel context.CancelFunc
Expand All @@ -200,13 +208,6 @@
watcher.Close()
}
}()
unhealthyTimeout := watchLoopUnhealthyTimeout
failpoint.Inject("fastTick", func() {
unhealthyTimeout = 5 * time.Second
})
ticker := time.NewTicker(etcdutil.RequestProgressInterval)
defer ticker.Stop()
lastReceivedResponseTime := time.Now()

for {
failpoint.Inject("delayWatcher", nil)
Expand All @@ -226,7 +227,7 @@
// so we check the etcd availability first.
if !etcdutil.IsHealthy(serverCtx, ls.client) {
if time.Since(lastReceivedResponseTime) > unhealthyTimeout {
log.Error("the connect of leadership watcher is unhealthy",
log.Error("the connection of the leadership watcher is unhealthy",
zap.Int64("revision", revision),
zap.String("leader-key", ls.leaderKey),
zap.String("purpose", ls.purpose))
Expand Down Expand Up @@ -254,29 +255,37 @@
zap.String("purpose", ls.purpose))
return
case <-ticker.C:
// When etcd is not available, the watcher.RequestProgress will block,
// so we check the etcd availability first.
if !etcdutil.IsHealthy(serverCtx, ls.client) {
// When etcd is not available, the watcher.RequestProgress will block,
// so we check the etcd availability first.
log.Error("the connect of leadership watcher is unhealthy",
log.Error("the connection of the leadership watcher is unhealthy",
zap.Int64("revision", revision),
zap.String("leader-key", ls.leaderKey),
zap.String("purpose", ls.purpose))
continue
}
// need to request progress to etcd to prevent etcd hold the watchChan,
// note: we need to use the same ctx with watcher.
// We need to request progress to etcd to prevent etcd hold the watchChan,
// note: we must use the same ctx with watcher.
if err := watcher.RequestProgress(watcherCtx); err != nil {
log.Warn("failed to request progress in leader watch loop", zap.Error(err))

Check warning on line 270 in pkg/election/leadership.go

View check run for this annotation

Codecov / codecov/patch

pkg/election/leadership.go#L270

Added line #L270 was not covered by tests
}
// If no message comes from an etcd watchChan for WatchChTimeoutDuration,
// create a new one and need not to reset lastReceivedResponseTime.
if time.Since(lastReceivedResponseTime) >= etcdutil.WatchChTimeoutDuration {
// If no msg comes from an etcd watchChan for WatchChTimeoutDuration long,
// we should cancel the watchChan and request a new watchChan from watcher.
log.Warn("watchChan is blocked for a long time, recreate a new watchChan")
failpoint.Inject("watchChanBlock", func() {
// we detect whether into this branch by returning directly when the failpoint is injected.
failpoint.Return()
})
log.Warn("watchChan is blocked for a long time, recreating a new watchChan")
continue

Check warning on line 280 in pkg/election/leadership.go

View check run for this annotation

Codecov / codecov/patch

pkg/election/leadership.go#L279-L280

Added lines #L279 - L280 were not covered by tests
}
case wresp := <-watchChan:
failpoint.Inject("watchChanBlock", func() {
// watchChanBlock is used to simulate the case that the watchChan is blocked for a long time.
// So we discard these responses when the failpoint is injected.
failpoint.Goto("WatchChanLoop")
})
lastReceivedResponseTime = time.Now()
// meet compacted error, use the compact revision.
if wresp.CompactRevision != 0 {
log.Warn("required revision has been compacted, use the compact revision",
zap.Int64("required-revision", revision),
Expand Down Expand Up @@ -305,7 +314,7 @@
}
revision = wresp.Header.Revision + 1
}
goto WatchChanLoop // use goto to avoid to create a new watchChan
goto WatchChanLoop // Use goto to avoid creating a new watchChan
}
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/election/leadership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,11 @@ func TestExitWatch(t *testing.T) {
server.Server.HardStop()
client1.Delete(context.Background(), leaderKey)
})
// TODO: add test to simulate the case that pd leader is io hang.
// Case7: whether request progress is valid
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) {
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/election/watchChanBlock", "return(true)"))
})
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/election/watchChanBlock"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/election/fastTick"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick"))
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/hot_region_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func TestHotReadRegionScheduleWithSmallHotRegion(t *testing.T) {
re.Len(ops, 1)
re.NotEqual(hotRegionID, ops[0].RegionID())

// Case5: If there are more than topnPosition hot regions, but them need to cool down,
// Case6: If there are more than topnPosition hot regions, but them need to cool down,
// we will schedule large hot region rather than small hot region, so there is no operator.
topnPosition = 2
ops = checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, func(tc *mockcluster.Cluster, _ *hotScheduler) {
Expand All @@ -418,7 +418,7 @@ func TestHotReadRegionScheduleWithSmallHotRegion(t *testing.T) {
re.Len(ops, 0)
topnPosition = origin

// Case6: If there are more than topnPosition hot regions, but them are pending,
// Case7: If there are more than topnPosition hot regions, but them are pending,
// we will schedule large hot region rather than small hot region, so there is no operator.
topnPosition = 2
ops = checkHotReadRegionScheduleWithSmallHotRegion(re, highLoad, lowLoad, func(tc *mockcluster.Cluster, hb *hotScheduler) {
Expand Down
33 changes: 21 additions & 12 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,26 +717,35 @@
select {
case <-ctx.Done():
return revision, nil
case <-lw.forceLoadCh:
revision, err = lw.load(ctx)
if err != nil {
log.Warn("force load key failed in watch loop", zap.String("name", lw.name),
zap.String("key", lw.key), zap.Error(err))
}
continue
case <-ticker.C:
// need to request progress to etcd to prevent etcd hold the watchChan,
// We need to request progress to etcd to prevent etcd hold the watchChan,
// note: we need to use the same ctx with watcher.
if err := watcher.RequestProgress(watcherCtx); err != nil {
log.Warn("failed to request progress in leader watch loop", zap.Error(err))
}

Check warning on line 732 in pkg/utils/etcdutil/etcdutil.go

View check run for this annotation

Codecov / codecov/patch

pkg/utils/etcdutil/etcdutil.go#L731-L732

Added lines #L731 - L732 were not covered by tests
// If no message comes from an etcd watchChan for WatchChTimeoutDuration,
// create a new one and need not to reset lastReceivedResponseTime.
if time.Since(lastReceivedResponseTime) >= WatchChTimeoutDuration {
// If no msg comes from an etcd watchChan for WatchChTimeoutDuration long,
// we should cancel the watchChan and request a new watchChan from watcher.
log.Warn("watchChan is blocked for a long time, recreate a new watchChan")
failpoint.Inject("watchChanBlock", func() {
// we detect whether into this branch by returning directly when the failpoint is injected.
failpoint.Return()
})
log.Warn("watchChan is blocked for a long time, recreating a new watchChan")
continue
}
case <-lw.forceLoadCh:
revision, err = lw.load(ctx)
if err != nil {
log.Warn("force load key failed in watch loop", zap.String("name", lw.name),
zap.String("key", lw.key), zap.Error(err))
}
continue
case wresp := <-watchChan:
failpoint.Inject("watchChanBlock", func() {
// watchChanBlock is used to simulate the case that the watchChan is blocked for a long time.
// So we discard these responses when the failpoint is injected.
failpoint.Goto("WatchChanLoop")
})
lastReceivedResponseTime = time.Now()
if wresp.CompactRevision != 0 {
log.Warn("required revision has been compacted, use the compact revision in watch loop",
Expand All @@ -749,8 +758,8 @@
zap.Int64("revision", revision),
errs.ZapError(errs.ErrEtcdWatcherCancel, wresp.Err()))
return revision, wresp.Err()
} else if wresp.IsProgressNotify() {
goto WatchChanLoop

Check warning on line 762 in pkg/utils/etcdutil/etcdutil.go

View check run for this annotation

Codecov / codecov/patch

pkg/utils/etcdutil/etcdutil.go#L761-L762

Added lines #L761 - L762 were not covered by tests
}
for _, event := range wresp.Events {
switch event.Type {
Expand Down Expand Up @@ -779,7 +788,7 @@
}
revision = wresp.Header.Revision + 1
}
goto WatchChanLoop // use goto to avoid to create a new watchChan
goto WatchChanLoop // use goto to avoid creating a new watchChan
}
}

Expand Down
30 changes: 30 additions & 0 deletions pkg/utils/etcdutil/etcdutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,36 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() {
failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/updateClient")
}

func (suite *loopWatcherTestSuite) TestWatcherChanBlock() {
watcher := NewLoopWatcher(
suite.ctx,
&suite.wg,
suite.client,
"test",
"TestWatcherChanBlock",
func(kv *mvccpb.KeyValue) error { return nil },
func(kv *mvccpb.KeyValue) error { return nil },
func() error { return nil },
)
done := make(chan struct{})
go func() {
watcher.watch(suite.ctx, 0)
done <- struct{}{}
}()

failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/watchChanBlock", "return(true)")

testutil.Eventually(suite.Require(), func() bool {
select {
case <-done:
return true
default:
return false
}
})
failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/watchChanBlock")
}

func (suite *loopWatcherTestSuite) startEtcd() {
etcd1, err := embed.StartEtcd(suite.config)
suite.NoError(err)
Expand Down
Loading