diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index ccaead5c5b6..76ee5d546e3 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -190,16 +190,19 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) { h.summaryPendingInfluence() + stores := cluster.GetStores() storesLoads := cluster.GetStoresLoads() { // update read statistics regionRead := cluster.RegionReadStats() h.stLoadInfos[readLeader] = summaryStoresLoad( + stores, storesLoads, h.pendingSums, regionRead, read, core.LeaderKind) h.stLoadInfos[readPeer] = summaryStoresLoad( + stores, storesLoads, h.pendingSums, regionRead, @@ -209,12 +212,14 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) { { // update write statistics regionWrite := cluster.RegionWriteStats() h.stLoadInfos[writeLeader] = summaryStoresLoad( + stores, storesLoads, h.pendingSums, regionWrite, write, core.LeaderKind) h.stLoadInfos[writePeer] = summaryStoresLoad( + stores, storesLoads, h.pendingSums, regionWrite, @@ -246,6 +251,7 @@ func (h *hotScheduler) gcRegionPendings() { // summaryStoresLoad Load information of all available stores. // it will filtered the hot peer and calculate the current and future stat(byte/key rate,count) for each store func summaryStoresLoad( + stores []*core.StoreInfo, storesLoads map[uint64][]float64, storePendings map[uint64]*Influence, storeHotPeers map[uint64][]*statistics.HotPeerStat, @@ -258,7 +264,12 @@ func summaryStoresLoad( allCount := 0.0 // Stores without byte rate statistics is not available to schedule. - for id, storeLoads := range storesLoads { + for _, store := range stores { + id := store.GetID() + storeLoads, ok := storesLoads[id] + if !ok { + continue + } loads := make([]float64, statistics.DimLen) switch rwTy { case read: @@ -310,6 +321,7 @@ func summaryStoresLoad( // Construct store load info. loadDetail[id] = &storeLoadDetail{ + Store: store, LoadPred: stLoadPred, HotPeers: hotPeers, } @@ -558,10 +570,6 @@ func (bs *balanceSolver) solve() []*operator.Operator { func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail { ret := make(map[uint64]*storeLoadDetail) for id, detail := range bs.stLoadDetail { - if bs.cluster.GetStore(id) == nil { - log.Error("failed to get the source store", zap.Uint64("store-id", id), errs.ZapError(errs.ErrGetSourceStore)) - continue - } if len(detail.HotPeers) == 0 { continue } @@ -705,12 +713,9 @@ func (bs *balanceSolver) getRegion() *core.RegionInfo { func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail { var ( filters []filter.Filter - candidates []*core.StoreInfo + candidates []*storeLoadDetail ) - srcStore := bs.cluster.GetStore(bs.cur.srcStoreID) - if srcStore == nil { - return nil - } + srcStore := bs.stLoadDetail[bs.cur.srcStoreID].Store switch bs.opTy { case movePeer: filters = []filter.Filter{ @@ -720,8 +725,8 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail { filter.NewPlacementSafeguard(bs.sche.GetName(), bs.cluster, bs.cur.region, srcStore), } - for storeID := range bs.stLoadDetail { - candidates = append(candidates, bs.cluster.GetStore(storeID)) + for _, detail := range bs.stLoadDetail { + candidates = append(candidates, detail) } case transferLeader: @@ -733,9 +738,9 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail { filters = append(filters, leaderFilter) } - for _, store := range bs.cluster.GetFollowerStores(bs.cur.region) { - if _, ok := bs.stLoadDetail[store.GetID()]; ok { - candidates = append(candidates, store) + for _, peer := range bs.cur.region.GetFollowers() { + if detail, ok := bs.stLoadDetail[peer.GetStoreId()]; ok { + candidates = append(candidates, detail) } } @@ -745,17 +750,17 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail { return bs.pickDstStores(filters, candidates) } -func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*core.StoreInfo) map[uint64]*storeLoadDetail { +func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*storeLoadDetail) map[uint64]*storeLoadDetail { ret := make(map[uint64]*storeLoadDetail, len(candidates)) dstToleranceRatio := bs.sche.conf.GetDstToleranceRatio() - for _, store := range candidates { + for _, detail := range candidates { + store := detail.Store if filter.Target(bs.cluster.GetOpts(), store, filters) { - detail := bs.stLoadDetail[store.GetID()] maxLoads := detail.LoadPred.max().Loads if slice.AllOf(maxLoads, func(i int) bool { return maxLoads[i]*dstToleranceRatio < detail.LoadPred.Expect.Loads[i] }) { - ret[store.GetID()] = bs.stLoadDetail[store.GetID()] + ret[store.GetID()] = detail hotSchedulerResultCounter.WithLabelValues("dst-store-succ", strconv.FormatUint(store.GetID(), 10)).Inc() } hotSchedulerResultCounter.WithLabelValues("dst-store-fail", strconv.FormatUint(store.GetID(), 10)).Inc() diff --git a/server/schedulers/shuffle_hot_region.go b/server/schedulers/shuffle_hot_region.go index 33b33961255..8d7b749ba02 100644 --- a/server/schedulers/shuffle_hot_region.go +++ b/server/schedulers/shuffle_hot_region.go @@ -132,10 +132,12 @@ func (s *shuffleHotRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Op } func (s *shuffleHotRegionScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Operator { + stores := cluster.GetStores() storesLoads := cluster.GetStoresLoads() switch typ { case read: s.stLoadInfos[readLeader] = summaryStoresLoad( + stores, storesLoads, map[uint64]*Influence{}, cluster.RegionReadStats(), @@ -143,6 +145,7 @@ func (s *shuffleHotRegionScheduler) dispatch(typ rwType, cluster opt.Cluster) [] return s.randomSchedule(cluster, s.stLoadInfos[readLeader]) case write: s.stLoadInfos[writeLeader] = summaryStoresLoad( + stores, storesLoads, map[uint64]*Influence{}, cluster.RegionWriteStats(), diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index 721ec0e57a9..0847a52d045 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -395,6 +395,7 @@ func maxLoad(a, b *storeLoad) *storeLoad { } type storeLoadDetail struct { + Store *core.StoreInfo LoadPred *storeLoadPred HotPeers []*statistics.HotPeerStat }