Skip to content

Commit

Permalink
Supply AwakenRegions message in StoreHeartbeatResponse for TiKV clust…
Browse files Browse the repository at this point in the history
…er failure recovery. (#5625)

close #5626

Supply extra AwakenRegions message in StoreHeartbeatResponse for the TiKV cluster when there exists abnormal TiKV node in the cluster, to wake up hibernated regions in time.

Signed-off-by: Lucasliang <nkcs_lykx@hotmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
LykxSassinator and ti-chi-bot authored Nov 4, 2022
1 parent 5b7c29e commit ea9b1e9
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 6 deletions.
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,8 @@ require (
moul.io/zapgorm2 v1.1.0 // indirect
sigs.k8s.io/yaml v1.1.0 // indirect
)

// When you modify PD cooperatively with kvproto, this will be useful to submit the PR to PD and the PR to
// kvproto at the same time. You can run `go mod tidy` to make it replaced with go-mod style specification.
// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`.
// replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch
47 changes: 47 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1349,6 +1349,53 @@ func (c *RaftCluster) SlowStoreRecovered(storeID uint64) {
c.core.SlowStoreRecovered(storeID)
}

// NeedAwakenAllRegionsInStore checks whether we should do AwakenRegions operation.
func (c *RaftCluster) NeedAwakenAllRegionsInStore(storeID uint64) (needAwaken bool, slowStoreIDs []uint64) {
store := c.GetStore(storeID)
// We just return AwakenRegions messages to those Serving stores which need to be awaken.
if store.IsSlow() || !store.NeedAwakenStore() {
return false, nil
}

needAwaken = false
for _, store := range c.GetStores() {
if store.IsRemoved() {
continue
}

// We will filter out heartbeat requests from slowStores.
if (store.IsUp() || store.IsRemoving()) && store.IsSlow() &&
store.GetStoreStats().GetStoreId() != storeID {
needAwaken = true
slowStoreIDs = append(slowStoreIDs, store.GetID())
}
}
return needAwaken, slowStoreIDs
}

// UpdateAwakenStoreTime updates the last awaken time for the store.
func (c *RaftCluster) UpdateAwakenStoreTime(storeID uint64, lastAwakenTime time.Time) error {
c.Lock()
defer c.Unlock()

store := c.GetStore(storeID)
if store == nil {
return errs.ErrStoreNotFound.FastGenByArgs(storeID)
}

if store.IsRemoved() {
return errs.ErrStoreRemoved.FastGenByArgs(storeID)
}

if store.IsPhysicallyDestroyed() {
return errs.ErrStoreDestroyed.FastGenByArgs(storeID)
}

newStore := store.Clone(core.SetLastAwakenTime(lastAwakenTime))

return c.putStoreLocked(newStore)
}

// UpStore up a store from offline
func (c *RaftCluster) UpStore(storeID uint64) error {
c.Lock()
Expand Down
29 changes: 29 additions & 0 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1748,6 +1748,35 @@ func TestCheckStaleRegion(t *testing.T) {
re.Error(checkStaleRegion(region.GetMeta(), origin.GetMeta()))
}

func TestAwakenStore(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

_, opt, err := newTestScheduleConfig()
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
n := uint64(3)
stores := newTestStores(n, "6.0.0")
re.False(stores[0].NeedAwakenStore())
for _, store := range stores {
re.NoError(cluster.PutStore(store.GetMeta()))
}
for i := uint64(1); i <= n; i++ {
needAwaken, _ := cluster.NeedAwakenAllRegionsInStore(i)
re.False(needAwaken)
}

now := time.Now()
store4 := stores[0].Clone(core.SetLastHeartbeatTS(now), core.SetLastAwakenTime(now.Add(-31*time.Second)))
re.NoError(cluster.putStoreLocked(store4))
store1 := cluster.GetStore(1)
re.True(store1.NeedAwakenStore())
re.NoError(cluster.UpdateAwakenStoreTime(1, now))
store1 = cluster.GetStore(1)
re.False(store1.NeedAwakenStore())
}

type testCluster struct {
*RaftCluster
}
Expand Down
21 changes: 15 additions & 6 deletions server/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
storePersistInterval = 5 * time.Minute
initialMinSpace = 8 * units.GiB // 2^33=8GB
slowStoreThreshold = 80
awakenStoreInterval = 30 * time.Second

// EngineKey is the label key used to indicate engine.
EngineKey = "engine"
Expand Down Expand Up @@ -60,17 +61,19 @@ type StoreInfo struct {
regionWeight float64
limiter map[storelimit.Type]*storelimit.StoreLimit
minResolvedTS uint64
lastAwakenTime time.Time
}

// NewStoreInfo creates StoreInfo with meta data.
func NewStoreInfo(store *metapb.Store, opts ...StoreCreateOption) *StoreInfo {
storeInfo := &StoreInfo{
meta: store,
storeStats: newStoreStats(),
leaderWeight: 1.0,
regionWeight: 1.0,
limiter: make(map[storelimit.Type]*storelimit.StoreLimit),
minResolvedTS: 0,
meta: store,
storeStats: newStoreStats(),
leaderWeight: 1.0,
regionWeight: 1.0,
limiter: make(map[storelimit.Type]*storelimit.StoreLimit),
minResolvedTS: 0,
lastAwakenTime: time.Now(),
}
for _, opt := range opts {
opt(storeInfo)
Expand Down Expand Up @@ -469,6 +472,12 @@ func (s *StoreInfo) GetMinResolvedTS() uint64 {
return s.minResolvedTS
}

// NeedAwakenStore checks whether all hibernated regions in this store should
// be awaken or not.
func (s *StoreInfo) NeedAwakenStore() bool {
return s.GetLastHeartbeatTS().Sub(s.lastAwakenTime) > awakenStoreInterval
}

var (
// If a store's last heartbeat is storeDisconnectDuration ago, the store will
// be marked as disconnected state. The value should be greater than tikv's
Expand Down
7 changes: 7 additions & 0 deletions server/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,10 @@ func ResetStoreLimit(limitType storelimit.Type, ratePerSec ...float64) StoreCrea
store.limiter[limitType] = storelimit.NewStoreLimit(ratePerSec[0], storelimit.RegionInfluence[limitType])
}
}

// SetLastAwakenTime sets last awaken time for the store.
func SetLastAwakenTime(lastAwaken time.Time) StoreCreateOption {
return func(store *StoreInfo) {
store.lastAwakenTime = lastAwaken
}
}
15 changes: 15 additions & 0 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,21 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear
ClusterVersion: rc.GetClusterVersion(),
}
rc.GetUnsafeRecoveryController().HandleStoreHeartbeat(request, resp)

// If this cluster has slow stores, we should awaken hibernated regions in other stores.
// TODO: waited to be polished. It's recommended to merge following AwakenRegions checking
// and UpdateAwakenStoreTime into HandlStoreHeartbeat.
if needAwaken, slowStoreIDs := rc.NeedAwakenAllRegionsInStore(storeID); needAwaken {
log.Info("forcely awaken hibernated regions", zap.Uint64("store-id", storeID), zap.Uint64s("slow-stores", slowStoreIDs))
err := rc.UpdateAwakenStoreTime(storeID, time.Now())
if err != nil {
log.Warn("failed to awaken hibernated regions in store", zap.Uint64("store-id", storeID))
} else {
resp.AwakenRegions = &pdpb.AwakenRegions{
AbnormalStores: slowStoreIDs,
}
}
}
return resp, nil
}

Expand Down

0 comments on commit ea9b1e9

Please sign in to comment.