diff --git a/go.mod b/go.mod index bbaadb64d69..ded55700181 100644 --- a/go.mod +++ b/go.mod @@ -175,3 +175,8 @@ require ( moul.io/zapgorm2 v1.1.0 // indirect sigs.k8s.io/yaml v1.1.0 // indirect ) + +// When you modify PD cooperatively with kvproto, this will be useful to submit the PR to PD and the PR to +// kvproto at the same time. You can run `go mod tidy` to make it replaced with go-mod style specification. +// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`. +// replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 84ffb1652d7..a687e841cfe 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1349,6 +1349,53 @@ func (c *RaftCluster) SlowStoreRecovered(storeID uint64) { c.core.SlowStoreRecovered(storeID) } +// NeedAwakenAllRegionsInStore checks whether we should do AwakenRegions operation. +func (c *RaftCluster) NeedAwakenAllRegionsInStore(storeID uint64) (needAwaken bool, slowStoreIDs []uint64) { + store := c.GetStore(storeID) + // We just return AwakenRegions messages to those Serving stores which need to be awaken. + if store.IsSlow() || !store.NeedAwakenStore() { + return false, nil + } + + needAwaken = false + for _, store := range c.GetStores() { + if store.IsRemoved() { + continue + } + + // We will filter out heartbeat requests from slowStores. + if (store.IsUp() || store.IsRemoving()) && store.IsSlow() && + store.GetStoreStats().GetStoreId() != storeID { + needAwaken = true + slowStoreIDs = append(slowStoreIDs, store.GetID()) + } + } + return needAwaken, slowStoreIDs +} + +// UpdateAwakenStoreTime updates the last awaken time for the store. +func (c *RaftCluster) UpdateAwakenStoreTime(storeID uint64, lastAwakenTime time.Time) error { + c.Lock() + defer c.Unlock() + + store := c.GetStore(storeID) + if store == nil { + return errs.ErrStoreNotFound.FastGenByArgs(storeID) + } + + if store.IsRemoved() { + return errs.ErrStoreRemoved.FastGenByArgs(storeID) + } + + if store.IsPhysicallyDestroyed() { + return errs.ErrStoreDestroyed.FastGenByArgs(storeID) + } + + newStore := store.Clone(core.SetLastAwakenTime(lastAwakenTime)) + + return c.putStoreLocked(newStore) +} + // UpStore up a store from offline func (c *RaftCluster) UpStore(storeID uint64) error { c.Lock() diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index dabbe5e2f91..3821fcbeb26 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -1748,6 +1748,35 @@ func TestCheckStaleRegion(t *testing.T) { re.Error(checkStaleRegion(region.GetMeta(), origin.GetMeta())) } +func TestAwakenStore(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, opt, err := newTestScheduleConfig() + re.NoError(err) + cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) + n := uint64(3) + stores := newTestStores(n, "6.0.0") + re.False(stores[0].NeedAwakenStore()) + for _, store := range stores { + re.NoError(cluster.PutStore(store.GetMeta())) + } + for i := uint64(1); i <= n; i++ { + needAwaken, _ := cluster.NeedAwakenAllRegionsInStore(i) + re.False(needAwaken) + } + + now := time.Now() + store4 := stores[0].Clone(core.SetLastHeartbeatTS(now), core.SetLastAwakenTime(now.Add(-31*time.Second))) + re.NoError(cluster.putStoreLocked(store4)) + store1 := cluster.GetStore(1) + re.True(store1.NeedAwakenStore()) + re.NoError(cluster.UpdateAwakenStoreTime(1, now)) + store1 = cluster.GetStore(1) + re.False(store1.NeedAwakenStore()) +} + type testCluster struct { *RaftCluster } diff --git a/server/core/store.go b/server/core/store.go index 49752de41d1..01157f2e3d0 100644 --- a/server/core/store.go +++ b/server/core/store.go @@ -33,6 +33,7 @@ const ( storePersistInterval = 5 * time.Minute initialMinSpace = 8 * units.GiB // 2^33=8GB slowStoreThreshold = 80 + awakenStoreInterval = 30 * time.Second // EngineKey is the label key used to indicate engine. EngineKey = "engine" @@ -60,17 +61,19 @@ type StoreInfo struct { regionWeight float64 limiter map[storelimit.Type]*storelimit.StoreLimit minResolvedTS uint64 + lastAwakenTime time.Time } // NewStoreInfo creates StoreInfo with meta data. func NewStoreInfo(store *metapb.Store, opts ...StoreCreateOption) *StoreInfo { storeInfo := &StoreInfo{ - meta: store, - storeStats: newStoreStats(), - leaderWeight: 1.0, - regionWeight: 1.0, - limiter: make(map[storelimit.Type]*storelimit.StoreLimit), - minResolvedTS: 0, + meta: store, + storeStats: newStoreStats(), + leaderWeight: 1.0, + regionWeight: 1.0, + limiter: make(map[storelimit.Type]*storelimit.StoreLimit), + minResolvedTS: 0, + lastAwakenTime: time.Now(), } for _, opt := range opts { opt(storeInfo) @@ -469,6 +472,12 @@ func (s *StoreInfo) GetMinResolvedTS() uint64 { return s.minResolvedTS } +// NeedAwakenStore checks whether all hibernated regions in this store should +// be awaken or not. +func (s *StoreInfo) NeedAwakenStore() bool { + return s.GetLastHeartbeatTS().Sub(s.lastAwakenTime) > awakenStoreInterval +} + var ( // If a store's last heartbeat is storeDisconnectDuration ago, the store will // be marked as disconnected state. The value should be greater than tikv's diff --git a/server/core/store_option.go b/server/core/store_option.go index 3d66097bd6b..e6ee7965afa 100644 --- a/server/core/store_option.go +++ b/server/core/store_option.go @@ -243,3 +243,10 @@ func ResetStoreLimit(limitType storelimit.Type, ratePerSec ...float64) StoreCrea store.limiter[limitType] = storelimit.NewStoreLimit(ratePerSec[0], storelimit.RegionInfluence[limitType]) } } + +// SetLastAwakenTime sets last awaken time for the store. +func SetLastAwakenTime(lastAwaken time.Time) StoreCreateOption { + return func(store *StoreInfo) { + store.lastAwakenTime = lastAwaken + } +} diff --git a/server/grpc_service.go b/server/grpc_service.go index 2165e2e7526..2e111a12923 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -664,6 +664,21 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear ClusterVersion: rc.GetClusterVersion(), } rc.GetUnsafeRecoveryController().HandleStoreHeartbeat(request, resp) + + // If this cluster has slow stores, we should awaken hibernated regions in other stores. + // TODO: waited to be polished. It's recommended to merge following AwakenRegions checking + // and UpdateAwakenStoreTime into HandlStoreHeartbeat. + if needAwaken, slowStoreIDs := rc.NeedAwakenAllRegionsInStore(storeID); needAwaken { + log.Info("forcely awaken hibernated regions", zap.Uint64("store-id", storeID), zap.Uint64s("slow-stores", slowStoreIDs)) + err := rc.UpdateAwakenStoreTime(storeID, time.Now()) + if err != nil { + log.Warn("failed to awaken hibernated regions in store", zap.Uint64("store-id", storeID)) + } else { + resp.AwakenRegions = &pdpb.AwakenRegions{ + AbnormalStores: slowStoreIDs, + } + } + } return resp, nil }