Skip to content

Commit

Permalink
scheduler: reduce GetStore in hot-region-scheduler (#3870) (#3911)
Browse files Browse the repository at this point in the history
* scheduler: reduce GetStore in hot-region-scheduler

Signed-off-by: HunDunDM <hundundm@gmail.com>

* tiny fix

Signed-off-by: HunDunDM <hundundm@gmail.com>

Co-authored-by: HunDunDM <hundundm@gmail.com>
  • Loading branch information
ti-chi-bot and HunDunDM authored Sep 13, 2021
1 parent aee9940 commit ca80632
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 19 deletions.
43 changes: 24 additions & 19 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,16 +190,19 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope
func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
h.summaryPendingInfluence()

stores := cluster.GetStores()
storesLoads := cluster.GetStoresLoads()

{ // update read statistics
regionRead := cluster.RegionReadStats()
h.stLoadInfos[readLeader] = summaryStoresLoad(
stores,
storesLoads,
h.pendingSums,
regionRead,
read, core.LeaderKind)
h.stLoadInfos[readPeer] = summaryStoresLoad(
stores,
storesLoads,
h.pendingSums,
regionRead,
Expand All @@ -209,12 +212,14 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
{ // update write statistics
regionWrite := cluster.RegionWriteStats()
h.stLoadInfos[writeLeader] = summaryStoresLoad(
stores,
storesLoads,
h.pendingSums,
regionWrite,
write, core.LeaderKind)

h.stLoadInfos[writePeer] = summaryStoresLoad(
stores,
storesLoads,
h.pendingSums,
regionWrite,
Expand Down Expand Up @@ -246,6 +251,7 @@ func (h *hotScheduler) gcRegionPendings() {
// summaryStoresLoad Load information of all available stores.
// it will filtered the hot peer and calculate the current and future stat(byte/key rate,count) for each store
func summaryStoresLoad(
stores []*core.StoreInfo,
storesLoads map[uint64][]float64,
storePendings map[uint64]*Influence,
storeHotPeers map[uint64][]*statistics.HotPeerStat,
Expand All @@ -258,7 +264,12 @@ func summaryStoresLoad(
allCount := 0.0

// Stores without byte rate statistics is not available to schedule.
for id, storeLoads := range storesLoads {
for _, store := range stores {
id := store.GetID()
storeLoads, ok := storesLoads[id]
if !ok {
continue
}
loads := make([]float64, statistics.DimLen)
switch rwTy {
case read:
Expand Down Expand Up @@ -310,6 +321,7 @@ func summaryStoresLoad(

// Construct store load info.
loadDetail[id] = &storeLoadDetail{
Store: store,
LoadPred: stLoadPred,
HotPeers: hotPeers,
}
Expand Down Expand Up @@ -558,10 +570,6 @@ func (bs *balanceSolver) solve() []*operator.Operator {
func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail {
ret := make(map[uint64]*storeLoadDetail)
for id, detail := range bs.stLoadDetail {
if bs.cluster.GetStore(id) == nil {
log.Error("failed to get the source store", zap.Uint64("store-id", id), errs.ZapError(errs.ErrGetSourceStore))
continue
}
if len(detail.HotPeers) == 0 {
continue
}
Expand Down Expand Up @@ -705,12 +713,9 @@ func (bs *balanceSolver) getRegion() *core.RegionInfo {
func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
var (
filters []filter.Filter
candidates []*core.StoreInfo
candidates []*storeLoadDetail
)
srcStore := bs.cluster.GetStore(bs.cur.srcStoreID)
if srcStore == nil {
return nil
}
srcStore := bs.stLoadDetail[bs.cur.srcStoreID].Store
switch bs.opTy {
case movePeer:
filters = []filter.Filter{
Expand All @@ -720,8 +725,8 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
filter.NewPlacementSafeguard(bs.sche.GetName(), bs.cluster, bs.cur.region, srcStore),
}

for storeID := range bs.stLoadDetail {
candidates = append(candidates, bs.cluster.GetStore(storeID))
for _, detail := range bs.stLoadDetail {
candidates = append(candidates, detail)
}

case transferLeader:
Expand All @@ -733,9 +738,9 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
filters = append(filters, leaderFilter)
}

for _, store := range bs.cluster.GetFollowerStores(bs.cur.region) {
if _, ok := bs.stLoadDetail[store.GetID()]; ok {
candidates = append(candidates, store)
for _, peer := range bs.cur.region.GetFollowers() {
if detail, ok := bs.stLoadDetail[peer.GetStoreId()]; ok {
candidates = append(candidates, detail)
}
}

Expand All @@ -745,17 +750,17 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
return bs.pickDstStores(filters, candidates)
}

func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*core.StoreInfo) map[uint64]*storeLoadDetail {
func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*storeLoadDetail) map[uint64]*storeLoadDetail {
ret := make(map[uint64]*storeLoadDetail, len(candidates))
dstToleranceRatio := bs.sche.conf.GetDstToleranceRatio()
for _, store := range candidates {
for _, detail := range candidates {
store := detail.Store
if filter.Target(bs.cluster.GetOpts(), store, filters) {
detail := bs.stLoadDetail[store.GetID()]
maxLoads := detail.LoadPred.max().Loads
if slice.AllOf(maxLoads, func(i int) bool {
return maxLoads[i]*dstToleranceRatio < detail.LoadPred.Expect.Loads[i]
}) {
ret[store.GetID()] = bs.stLoadDetail[store.GetID()]
ret[store.GetID()] = detail
hotSchedulerResultCounter.WithLabelValues("dst-store-succ", strconv.FormatUint(store.GetID(), 10)).Inc()
}
hotSchedulerResultCounter.WithLabelValues("dst-store-fail", strconv.FormatUint(store.GetID(), 10)).Inc()
Expand Down
3 changes: 3 additions & 0 deletions server/schedulers/shuffle_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,17 +132,20 @@ func (s *shuffleHotRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Op
}

func (s *shuffleHotRegionScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Operator {
stores := cluster.GetStores()
storesLoads := cluster.GetStoresLoads()
switch typ {
case read:
s.stLoadInfos[readLeader] = summaryStoresLoad(
stores,
storesLoads,
map[uint64]*Influence{},
cluster.RegionReadStats(),
read, core.LeaderKind)
return s.randomSchedule(cluster, s.stLoadInfos[readLeader])
case write:
s.stLoadInfos[writeLeader] = summaryStoresLoad(
stores,
storesLoads,
map[uint64]*Influence{},
cluster.RegionWriteStats(),
Expand Down
1 change: 1 addition & 0 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ func maxLoad(a, b *storeLoad) *storeLoad {
}

type storeLoadDetail struct {
Store *core.StoreInfo
LoadPred *storeLoadPred
HotPeers []*statistics.HotPeerStat
}
Expand Down

0 comments on commit ca80632

Please sign in to comment.