diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index bc057fc718b..23275336d16 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -466,7 +466,7 @@ func (c *coordinator) stop() { // TODO: remove it. type hasHotStatus interface { GetHotStatus(statistics.RWType) *statistics.StoreHotPeersInfos - GetPendingInfluence() map[uint64]*schedulers.Influence + GetPendingInfluence() map[uint64]*statistics.Influence } func (c *coordinator) getHotWriteRegions() *statistics.StoreHotPeersInfos { diff --git a/server/schedulers/grant_hot_region.go b/server/schedulers/grant_hot_region.go index 65b8090dba8..d7ccd459b5d 100644 --- a/server/schedulers/grant_hot_region.go +++ b/server/schedulers/grant_hot_region.go @@ -159,7 +159,7 @@ type grantHotRegionScheduler struct { conf *grantHotRegionSchedulerConfig handler http.Handler types []statistics.RWType - stLoadInfos [resourceTypeLen]map[uint64]*storeLoadDetail + stLoadInfos [resourceTypeLen]map[uint64]*statistics.StoreLoadDetail } // newGrantHotRegionScheduler creates an admin scheduler that transfers hot region peer to fixed store and hot region leader to one store. @@ -174,7 +174,7 @@ func newGrantHotRegionScheduler(opController *schedule.OperatorController, conf types: []statistics.RWType{statistics.Read, statistics.Write}, } for ty := resourceType(0); ty < resourceTypeLen; ty++ { - ret.stLoadInfos[ty] = map[uint64]*storeLoadDetail{} + ret.stLoadInfos[ty] = map[uint64]*statistics.StoreLoadDetail{} } return ret } @@ -274,11 +274,11 @@ func (s *grantHotRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Oper } func (s *grantHotRegionScheduler) dispatch(typ statistics.RWType, cluster opt.Cluster) []*operator.Operator { - storeInfos := summaryStoreInfos(cluster) + storeInfos := statistics.SummaryStoreInfos(cluster) storesLoads := cluster.GetStoresLoads() isTraceRegionFlow := cluster.GetOpts().IsTraceRegionFlow() - var stLoadInfos map[uint64]*storeLoadDetail + var stLoadInfos map[uint64]*statistics.StoreLoadDetail switch typ { case statistics.Read: stLoadInfos = summaryStoresLoad( @@ -295,7 +295,7 @@ func (s *grantHotRegionScheduler) dispatch(typ statistics.RWType, cluster opt.Cl isTraceRegionFlow, statistics.Write, core.RegionKind) } - infos := make([]*storeLoadDetail, len(stLoadInfos)) + infos := make([]*statistics.StoreLoadDetail, len(stLoadInfos)) index := 0 for _, info := range stLoadInfos { infos[index] = info @@ -307,7 +307,7 @@ func (s *grantHotRegionScheduler) dispatch(typ statistics.RWType, cluster opt.Cl return s.randomSchedule(cluster, infos) } -func (s *grantHotRegionScheduler) randomSchedule(cluster opt.Cluster, infos []*storeLoadDetail) (ops []*operator.Operator) { +func (s *grantHotRegionScheduler) randomSchedule(cluster opt.Cluster, infos []*statistics.StoreLoadDetail) (ops []*operator.Operator) { isleader := s.r.Int()%2 == 1 for _, detail := range infos { srcStoreID := detail.Info.Store.GetID() diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index e490156528a..9d8829f3ee6 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -95,10 +95,10 @@ type hotScheduler struct { // store information, including pending Influence by resource type // Every time `Schedule()` will recalculate it. - stInfos map[uint64]*storeSummaryInfo + stInfos map[uint64]*statistics.StoreSummaryInfo // temporary states but exported to API or metrics // Every time `Schedule()` will recalculate it. - stLoadInfos [resourceTypeLen]map[uint64]*storeLoadDetail + stLoadInfos [resourceTypeLen]map[uint64]*statistics.StoreLoadDetail // config of hot scheduler conf *hotRegionSchedulerConfig @@ -115,7 +115,7 @@ func newHotScheduler(opController *schedule.OperatorController, conf *hotRegionS conf: conf, } for ty := resourceType(0); ty < resourceTypeLen; ty++ { - ret.stLoadInfos[ty] = map[uint64]*storeLoadDetail{} + ret.stLoadInfos[ty] = map[uint64]*statistics.StoreLoadDetail{} } return ret } @@ -170,7 +170,7 @@ func (h *hotScheduler) dispatch(typ statistics.RWType, cluster opt.Cluster) []*o // prepareForBalance calculate the summary of pending Influence for each store and prepare the load detail for // each store func (h *hotScheduler) prepareForBalance(typ statistics.RWType, cluster opt.Cluster) { - h.stInfos = summaryStoreInfos(cluster) + h.stInfos = statistics.SummaryStoreInfos(cluster) h.summaryPendingInfluence() storesLoads := cluster.GetStoresLoads() isTraceRegionFlow := cluster.GetOpts().IsTraceRegionFlow() @@ -231,15 +231,15 @@ func (h *hotScheduler) summaryPendingInfluence() { } if from != nil && weight > 0 { - from.addInfluence(&p.origin, -weight) + from.AddInfluence(&p.origin, -weight) } if to != nil && weight > 0 { - to.addInfluence(&p.origin, weight) + to.AddInfluence(&p.origin, weight) } } } -func (h *hotScheduler) tryAddPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence, maxZombieDur time.Duration) bool { +func (h *hotScheduler) tryAddPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl statistics.Influence, maxZombieDur time.Duration) bool { regionID := op.RegionID() _, ok := h.regionPendings[regionID] if ok { @@ -323,18 +323,18 @@ func (h *hotScheduler) balanceHotWriteRegions(cluster opt.Cluster) []*operator.O type balanceSolver struct { sche *hotScheduler cluster opt.Cluster - stLoadDetail map[uint64]*storeLoadDetail + stLoadDetail map[uint64]*statistics.StoreLoadDetail rwTy statistics.RWType opTy opType cur *solution best *solution ops []*operator.Operator - infl Influence + infl statistics.Influence - maxSrc *storeLoad - minDst *storeLoad - rankStep *storeLoad + maxSrc *statistics.StoreLoad + minDst *statistics.StoreLoad + rankStep *statistics.StoreLoad // firstPriority and secondPriority indicate priority of hot schedule // they may be byte(0), key(1), query(2), and always less than dimLen @@ -346,10 +346,10 @@ type balanceSolver struct { } type solution struct { - srcDetail *storeLoadDetail + srcDetail *statistics.StoreLoadDetail srcPeerStat *statistics.HotPeerStat region *core.RegionInfo - dstDetail *storeLoadDetail + dstDetail *statistics.StoreLoadDetail // progressiveRank measures the contribution for balance. // The smaller the rank, the better this solution is. @@ -370,20 +370,20 @@ func (bs *balanceSolver) init() { } // And it will be unnecessary to filter unhealthy store, because it has been solved in process heartbeat - bs.maxSrc = &storeLoad{Loads: make([]float64, statistics.DimLen)} - bs.minDst = &storeLoad{ + bs.maxSrc = &statistics.StoreLoad{Loads: make([]float64, statistics.DimLen)} + bs.minDst = &statistics.StoreLoad{ Loads: make([]float64, statistics.DimLen), Count: math.MaxFloat64, } for i := range bs.minDst.Loads { bs.minDst.Loads[i] = math.MaxFloat64 } - maxCur := &storeLoad{Loads: make([]float64, statistics.DimLen)} + maxCur := &statistics.StoreLoad{Loads: make([]float64, statistics.DimLen)} for _, detail := range bs.stLoadDetail { - bs.maxSrc = maxLoad(bs.maxSrc, detail.LoadPred.min()) - bs.minDst = minLoad(bs.minDst, detail.LoadPred.max()) - maxCur = maxLoad(maxCur, &detail.LoadPred.Current) + bs.maxSrc = statistics.MaxLoad(bs.maxSrc, detail.LoadPred.Min()) + bs.minDst = statistics.MinLoad(bs.minDst, detail.LoadPred.Max()) + maxCur = statistics.MaxLoad(maxCur, &detail.LoadPred.Current) } rankStepRatios := []float64{ @@ -394,7 +394,7 @@ func (bs *balanceSolver) init() { for i := range stepLoads { stepLoads[i] = maxCur.Loads[i] * rankStepRatios[i] } - bs.rankStep = &storeLoad{ + bs.rankStep = &statistics.StoreLoad{ Loads: stepLoads, Count: maxCur.Count * bs.sche.conf.GetCountRankStepRatio(), } @@ -510,7 +510,7 @@ func (bs *balanceSolver) tryAddPendingInfluence() bool { default: maxZombieDur = bs.sche.conf.GetStoreStatZombieDuration() } - return bs.sche.tryAddPendingInfluence(bs.ops[0], bs.best.srcDetail.getID(), bs.best.dstDetail.getID(), bs.infl, maxZombieDur) + return bs.sche.tryAddPendingInfluence(bs.ops[0], bs.best.srcDetail.GetID(), bs.best.dstDetail.GetID(), bs.infl, maxZombieDur) } func (bs *balanceSolver) isForWriteLeader() bool { @@ -523,8 +523,8 @@ func (bs *balanceSolver) isForWritePeer() bool { // filterSrcStores compare the min rate and the ratio * expectation rate, if two dim rate is greater than // its expectation * ratio, the store would be selected as hot source store -func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail { - ret := make(map[uint64]*storeLoadDetail) +func (bs *balanceSolver) filterSrcStores() map[uint64]*statistics.StoreLoadDetail { + ret := make(map[uint64]*statistics.StoreLoadDetail) confSrcToleranceRatio := bs.sche.conf.GetSrcToleranceRatio() confEnableForTiFlash := bs.sche.conf.GetEnableForTiFlash() for id, detail := range bs.stLoadDetail { @@ -542,7 +542,7 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail { continue } - if bs.checkSrcByDimPriorityAndTolerance(detail.LoadPred.min(), &detail.LoadPred.Expect, srcToleranceRatio) { + if bs.checkSrcByDimPriorityAndTolerance(detail.LoadPred.Min(), &detail.LoadPred.Expect, srcToleranceRatio) { ret[id] = detail hotSchedulerResultCounter.WithLabelValues("src-store-succ", strconv.FormatUint(id, 10)).Inc() } else { @@ -552,7 +552,7 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail { return ret } -func (bs *balanceSolver) checkSrcByDimPriorityAndTolerance(minLoad, expectLoad *storeLoad, toleranceRatio float64) bool { +func (bs *balanceSolver) checkSrcByDimPriorityAndTolerance(minLoad, expectLoad *statistics.StoreLoad, toleranceRatio float64) bool { if bs.sche.conf.IsStrictPickingStoreEnabled() { return slice.AllOf(minLoad.Loads, func(i int) bool { if bs.isSelectedDim(i) { @@ -600,13 +600,13 @@ func (bs *balanceSolver) sortHotPeers(ret []*statistics.HotPeerStat, maxPeerNum firstSort := make([]*statistics.HotPeerStat, len(ret)) copy(firstSort, ret) sort.Slice(firstSort, func(i, j int) bool { - k := getRegionStatKind(bs.rwTy, bs.firstPriority) + k := statistics.GetRegionStatKind(bs.rwTy, bs.firstPriority) return firstSort[i].GetLoad(k) > firstSort[j].GetLoad(k) }) secondSort := make([]*statistics.HotPeerStat, len(ret)) copy(secondSort, ret) sort.Slice(secondSort, func(i, j int) bool { - k := getRegionStatKind(bs.rwTy, bs.secondPriority) + k := statistics.GetRegionStatKind(bs.rwTy, bs.secondPriority) return secondSort[i].GetLoad(k) > secondSort[j].GetLoad(k) }) union := make(map[*statistics.HotPeerStat]struct{}, maxPeerNum) @@ -671,13 +671,13 @@ func (bs *balanceSolver) getRegion() *core.RegionInfo { switch bs.opTy { case movePeer: - srcPeer := region.GetStorePeer(bs.cur.srcDetail.getID()) + srcPeer := region.GetStorePeer(bs.cur.srcDetail.GetID()) if srcPeer == nil { log.Debug("region does not have a peer on source store, maybe stat out of date", zap.Uint64("region-id", bs.cur.srcPeerStat.ID())) return nil } case transferLeader: - if region.GetLeader().GetStoreId() != bs.cur.srcDetail.getID() { + if region.GetLeader().GetStoreId() != bs.cur.srcDetail.GetID() { log.Debug("region leader is not on source store, maybe stat out of date", zap.Uint64("region-id", bs.cur.srcPeerStat.ID())) return nil } @@ -689,10 +689,10 @@ func (bs *balanceSolver) getRegion() *core.RegionInfo { } // filterDstStores select the candidate store by filters -func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail { +func (bs *balanceSolver) filterDstStores() map[uint64]*statistics.StoreLoadDetail { var ( filters []filter.Filter - candidates []*storeLoadDetail + candidates []*statistics.StoreLoadDetail ) srcStore := bs.cur.srcDetail.Info.Store switch bs.opTy { @@ -729,8 +729,8 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail { return bs.pickDstStores(filters, candidates) } -func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*storeLoadDetail) map[uint64]*storeLoadDetail { - ret := make(map[uint64]*storeLoadDetail, len(candidates)) +func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*statistics.StoreLoadDetail) map[uint64]*statistics.StoreLoadDetail { + ret := make(map[uint64]*statistics.StoreLoadDetail, len(candidates)) confDstToleranceRatio := bs.sche.conf.GetDstToleranceRatio() confEnableForTiFlash := bs.sche.conf.GetEnableForTiFlash() for _, detail := range candidates { @@ -747,7 +747,7 @@ func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*st } if filter.Target(bs.cluster.GetOpts(), store, filters) { id := store.GetID() - if bs.checkDstByPriorityAndTolerance(detail.LoadPred.max(), &detail.LoadPred.Expect, dstToleranceRatio) { + if bs.checkDstByPriorityAndTolerance(detail.LoadPred.Max(), &detail.LoadPred.Expect, dstToleranceRatio) { ret[id] = detail hotSchedulerResultCounter.WithLabelValues("dst-store-succ", strconv.FormatUint(id, 10)).Inc() } else { @@ -758,7 +758,7 @@ func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*st return ret } -func (bs *balanceSolver) checkDstByPriorityAndTolerance(maxLoad, expect *storeLoad, toleranceRatio float64) bool { +func (bs *balanceSolver) checkDstByPriorityAndTolerance(maxLoad, expect *statistics.StoreLoad, toleranceRatio float64) bool { if bs.sche.conf.IsStrictPickingStoreEnabled() { return slice.AllOf(maxLoad.Loads, func(i int) bool { if bs.isSelectedDim(i) { @@ -775,8 +775,8 @@ func (bs *balanceSolver) checkDstByPriorityAndTolerance(maxLoad, expect *storeLo func (bs *balanceSolver) calcProgressiveRank() { src := bs.cur.srcDetail dst := bs.cur.dstDetail - srcLd := src.LoadPred.min() - dstLd := dst.LoadPred.max() + srcLd := src.LoadPred.Min() + dstLd := dst.LoadPred.Max() bs.cur.progressiveRank = 0 peer := bs.cur.srcPeerStat @@ -786,7 +786,7 @@ func (bs *balanceSolver) calcProgressiveRank() { } srcRate := srcLd.Loads[bs.firstPriority] dstRate := dstLd.Loads[bs.firstPriority] - peerRate := peer.GetLoad(getRegionStatKind(bs.rwTy, bs.firstPriority)) + peerRate := peer.GetLoad(statistics.GetRegionStatKind(bs.rwTy, bs.firstPriority)) if srcRate-peerRate >= dstRate+peerRate { bs.cur.progressiveRank = -1 } @@ -822,20 +822,20 @@ func (bs *balanceSolver) calcProgressiveRank() { // isTolerance checks source store and target store by checking the difference value with pendingAmpFactor * pendingPeer. // This will make the hot region scheduling slow even serializely running when each 2 store's pending influence is close. -func (bs *balanceSolver) isTolerance(src, dst *storeLoadDetail, dim int) bool { +func (bs *balanceSolver) isTolerance(src, dst *statistics.StoreLoadDetail, dim int) bool { srcRate := src.LoadPred.Current.Loads[dim] dstRate := dst.LoadPred.Current.Loads[dim] if srcRate <= dstRate { return false } pendingAmp := (1 + pendingAmpFactor*srcRate/(srcRate-dstRate)) - srcPending := src.LoadPred.pending().Loads[dim] - dstPending := dst.LoadPred.pending().Loads[dim] - hotPendingStatus.WithLabelValues(bs.rwTy.String(), strconv.FormatUint(src.getID(), 10), strconv.FormatUint(dst.getID(), 10)).Set(pendingAmp) + srcPending := src.LoadPred.Pending().Loads[dim] + dstPending := dst.LoadPred.Pending().Loads[dim] + hotPendingStatus.WithLabelValues(bs.rwTy.String(), strconv.FormatUint(src.GetID(), 10), strconv.FormatUint(dst.GetID(), 10)).Set(pendingAmp) return srcRate-pendingAmp*srcPending > dstRate+pendingAmp*dstPending } -func (bs *balanceSolver) getHotDecRatioByPriorities(srcLd, dstLd *storeLoad, peer *statistics.HotPeerStat) (bool, float64, bool, float64) { +func (bs *balanceSolver) getHotDecRatioByPriorities(srcLd, dstLd *statistics.StoreLoad, peer *statistics.HotPeerStat) (bool, float64, bool, float64) { // we use DecRatio(Decline Ratio) to expect that the dst store's rate should still be less // than the src store's rate after scheduling one peer. getSrcDecRate := func(a, b float64) float64 { @@ -847,7 +847,7 @@ func (bs *balanceSolver) getHotDecRatioByPriorities(srcLd, dstLd *storeLoad, pee checkHot := func(dim int) (bool, float64) { srcRate := srcLd.Loads[dim] dstRate := dstLd.Loads[dim] - peerRate := peer.GetLoad(getRegionStatKind(bs.rwTy, dim)) + peerRate := peer.GetLoad(statistics.GetRegionStatKind(bs.rwTy, dim)) decRatio := (dstRate + peerRate) / getSrcDecRate(srcRate, peerRate) isHot := peerRate >= bs.getMinRate(dim) return isHot, decRatio @@ -898,7 +898,7 @@ func (bs *balanceSolver) betterThan(old *solution) bool { // compare region if bs.isForWriteLeader() { - kind := getRegionStatKind(statistics.Write, bs.firstPriority) + kind := statistics.GetRegionStatKind(statistics.Write, bs.firstPriority) switch { case bs.cur.srcPeerStat.GetLoad(kind) > old.srcPeerStat.GetLoad(kind): return true @@ -934,7 +934,7 @@ func (bs *balanceSolver) betterThan(old *solution) bool { } func (bs *balanceSolver) getRkCmpPriorities(old *solution) (firstCmp int, secondCmp int) { - fk, sk := getRegionStatKind(bs.rwTy, bs.firstPriority), getRegionStatKind(bs.rwTy, bs.secondPriority) + fk, sk := statistics.GetRegionStatKind(bs.rwTy, bs.firstPriority), statistics.GetRegionStatKind(bs.rwTy, bs.secondPriority) dimToStep := func(priority int) float64 { switch priority { case statistics.ByteDim: @@ -952,7 +952,7 @@ func (bs *balanceSolver) getRkCmpPriorities(old *solution) (firstCmp int, second } // smaller is better -func (bs *balanceSolver) compareSrcStore(detail1, detail2 *storeLoadDetail) int { +func (bs *balanceSolver) compareSrcStore(detail1, detail2 *statistics.StoreLoadDetail) int { if detail1 != detail2 { // compare source store var lpCmp storeLPCmp @@ -985,7 +985,7 @@ func (bs *balanceSolver) compareSrcStore(detail1, detail2 *storeLoadDetail) int } // smaller is better -func (bs *balanceSolver) compareDstStore(detail1, detail2 *storeLoadDetail) int { +func (bs *balanceSolver) compareDstStore(detail1, detail2 *statistics.StoreLoadDetail) int { if detail1 != detail2 { // compare destination store var lpCmp storeLPCmp @@ -1027,14 +1027,14 @@ func (bs *balanceSolver) isReadyToBuild() bool { bs.cur.srcPeerStat == nil || bs.cur.region == nil { return false } - if bs.cur.srcDetail.getID() != bs.cur.srcPeerStat.StoreID || + if bs.cur.srcDetail.GetID() != bs.cur.srcPeerStat.StoreID || bs.cur.region.GetID() != bs.cur.srcPeerStat.ID() { return false } return true } -func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *Influence) { +func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *statistics.Influence) { if !bs.isReadyToBuild() { return nil, nil } @@ -1045,8 +1045,8 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *Influence targetLabel string ) - srcStoreID := bs.cur.srcDetail.getID() - dstStoreID := bs.cur.dstDetail.getID() + srcStoreID := bs.cur.srcDetail.GetID() + dstStoreID := bs.cur.dstDetail.GetID() switch bs.opTy { case movePeer: srcPeer := bs.cur.region.GetStorePeer(srcStoreID) // checked in getRegionAndSrcPeer @@ -1115,7 +1115,7 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *Influence schedulerCounter.WithLabelValues(bs.sche.GetName(), "new-operator"), schedulerCounter.WithLabelValues(bs.sche.GetName(), bs.opTy.String())) - infl = &Influence{ + infl = &statistics.Influence{ Loads: append(bs.cur.srcPeerStat.Loads[:0:0], bs.cur.srcPeerStat.Loads...), Count: 1, } @@ -1135,10 +1135,10 @@ func (h *hotScheduler) GetHotStatus(typ statistics.RWType) *statistics.StoreHotP asLeader := make(statistics.StoreHotPeersStat, len(h.stLoadInfos[leaderTyp])) asPeer := make(statistics.StoreHotPeersStat, len(h.stLoadInfos[peerTyp])) for id, detail := range h.stLoadInfos[leaderTyp] { - asLeader[id] = detail.toHotPeersStat() + asLeader[id] = detail.ToHotPeersStat() } for id, detail := range h.stLoadInfos[peerTyp] { - asPeer[id] = detail.toHotPeersStat() + asPeer[id] = detail.ToHotPeersStat() } return &statistics.StoreHotPeersInfos{ AsLeader: asLeader, @@ -1146,10 +1146,10 @@ func (h *hotScheduler) GetHotStatus(typ statistics.RWType) *statistics.StoreHotP } } -func (h *hotScheduler) GetPendingInfluence() map[uint64]*Influence { +func (h *hotScheduler) GetPendingInfluence() map[uint64]*statistics.Influence { h.RLock() defer h.RUnlock() - ret := make(map[uint64]*Influence, len(h.stInfos)) + ret := make(map[uint64]*statistics.Influence, len(h.stInfos)) for id, info := range h.stInfos { if info.PendingSum != nil { ret[id] = info.PendingSum @@ -1234,24 +1234,6 @@ func toResourceType(rwTy statistics.RWType, opTy opType) resourceType { panic(fmt.Sprintf("invalid arguments for toResourceType: rwTy = %v, opTy = %v", rwTy, opTy)) } -func getRegionStatKind(rwTy statistics.RWType, dim int) statistics.RegionStatKind { - switch { - case rwTy == statistics.Read && dim == statistics.ByteDim: - return statistics.RegionReadBytes - case rwTy == statistics.Read && dim == statistics.KeyDim: - return statistics.RegionReadKeys - case rwTy == statistics.Write && dim == statistics.ByteDim: - return statistics.RegionWriteBytes - case rwTy == statistics.Write && dim == statistics.KeyDim: - return statistics.RegionWriteKeys - case rwTy == statistics.Write && dim == statistics.QueryDim: - return statistics.RegionWriteQuery - case rwTy == statistics.Read && dim == statistics.QueryDim: - return statistics.RegionReadQuery - } - return 0 -} - func stringToDim(name string) int { switch name { case BytePriority: diff --git a/server/schedulers/hot_region_test.go b/server/schedulers/hot_region_test.go index e21666b7a16..dd7192cba55 100644 --- a/server/schedulers/hot_region_test.go +++ b/server/schedulers/hot_region_test.go @@ -100,7 +100,7 @@ func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) { op.Start() operator.SetOperatorStatusReachTime(op, operator.CREATED, time.Now().Add(-5*statistics.StoreHeartBeatReportInterval*time.Second)) operator.SetOperatorStatusReachTime(op, operator.STARTED, time.Now().Add((-5*statistics.StoreHeartBeatReportInterval+1)*time.Second)) - return newPendingInfluence(op, 2, 4, Influence{}, hb.conf.GetStoreStatZombieDuration()) + return newPendingInfluence(op, 2, 4, statistics.Influence{}, hb.conf.GetStoreStatZombieDuration()) } justDoneOpInfluence := func(region *core.RegionInfo, ty opType) *pendingInfluence { infl := notDoneOpInfluence(region, ty) diff --git a/server/schedulers/shuffle_hot_region.go b/server/schedulers/shuffle_hot_region.go index 9c48028ac0c..2f2cb05bd52 100644 --- a/server/schedulers/shuffle_hot_region.go +++ b/server/schedulers/shuffle_hot_region.go @@ -78,7 +78,7 @@ type shuffleHotRegionSchedulerConfig struct { // the hot peer. type shuffleHotRegionScheduler struct { *BaseScheduler - stLoadInfos [resourceTypeLen]map[uint64]*storeLoadDetail + stLoadInfos [resourceTypeLen]map[uint64]*statistics.StoreLoadDetail r *rand.Rand conf *shuffleHotRegionSchedulerConfig types []statistics.RWType @@ -94,7 +94,7 @@ func newShuffleHotRegionScheduler(opController *schedule.OperatorController, con r: rand.New(rand.NewSource(time.Now().UnixNano())), } for ty := resourceType(0); ty < resourceTypeLen; ty++ { - ret.stLoadInfos[ty] = map[uint64]*storeLoadDetail{} + ret.stLoadInfos[ty] = map[uint64]*statistics.StoreLoadDetail{} } return ret } @@ -134,7 +134,7 @@ func (s *shuffleHotRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Op } func (s *shuffleHotRegionScheduler) dispatch(typ statistics.RWType, cluster opt.Cluster) []*operator.Operator { - storeInfos := summaryStoreInfos(cluster) + storeInfos := statistics.SummaryStoreInfos(cluster) storesLoads := cluster.GetStoresLoads() isTraceRegionFlow := cluster.GetOpts().IsTraceRegionFlow() @@ -159,7 +159,7 @@ func (s *shuffleHotRegionScheduler) dispatch(typ statistics.RWType, cluster opt. return nil } -func (s *shuffleHotRegionScheduler) randomSchedule(cluster opt.Cluster, loadDetail map[uint64]*storeLoadDetail) []*operator.Operator { +func (s *shuffleHotRegionScheduler) randomSchedule(cluster opt.Cluster, loadDetail map[uint64]*statistics.StoreLoadDetail) []*operator.Operator { for _, detail := range loadDetail { if len(detail.HotPeers) < 1 { continue diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index dafb6754453..7440c27d2f6 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -16,7 +16,6 @@ package schedulers import ( "fmt" - "math" "net/url" "strconv" "time" @@ -202,20 +201,14 @@ func getKeyRanges(args []string) ([]core.KeyRange, error) { return ranges, nil } -// Influence records operator influence. -type Influence struct { - Loads []float64 - Count float64 -} - type pendingInfluence struct { op *operator.Operator from, to uint64 - origin Influence + origin statistics.Influence maxZombieDuration time.Duration } -func newPendingInfluence(op *operator.Operator, from, to uint64, infl Influence, maxZombieDur time.Duration) *pendingInfluence { +func newPendingInfluence(op *operator.Operator, from, to uint64, infl statistics.Influence, maxZombieDur time.Duration) *pendingInfluence { return &pendingInfluence{ op: op, from: from, @@ -225,55 +218,26 @@ func newPendingInfluence(op *operator.Operator, from, to uint64, infl Influence, } } -type storeLoad struct { - Loads []float64 - Count float64 -} - -func (load storeLoad) ToLoadPred(rwTy statistics.RWType, infl *Influence) *storeLoadPred { - future := storeLoad{ - Loads: append(load.Loads[:0:0], load.Loads...), - Count: load.Count, - } - if infl != nil { - switch rwTy { - case statistics.Read: - future.Loads[statistics.ByteDim] += infl.Loads[statistics.RegionReadBytes] - future.Loads[statistics.KeyDim] += infl.Loads[statistics.RegionReadKeys] - future.Loads[statistics.QueryDim] += infl.Loads[statistics.RegionReadQuery] - case statistics.Write: - future.Loads[statistics.ByteDim] += infl.Loads[statistics.RegionWriteBytes] - future.Loads[statistics.KeyDim] += infl.Loads[statistics.RegionWriteKeys] - future.Loads[statistics.QueryDim] += infl.Loads[statistics.RegionWriteQuery] - } - future.Count += infl.Count - } - return &storeLoadPred{ - Current: load, - Future: future, - } -} - -func stLdRate(dim int) func(ld *storeLoad) float64 { - return func(ld *storeLoad) float64 { +func stLdRate(dim int) func(ld *statistics.StoreLoad) float64 { + return func(ld *statistics.StoreLoad) float64 { return ld.Loads[dim] } } -func stLdCount(ld *storeLoad) float64 { +func stLdCount(ld *statistics.StoreLoad) float64 { return ld.Count } -type storeLoadCmp func(ld1, ld2 *storeLoad) int +type storeLoadCmp func(ld1, ld2 *statistics.StoreLoad) int func negLoadCmp(cmp storeLoadCmp) storeLoadCmp { - return func(ld1, ld2 *storeLoad) int { + return func(ld1, ld2 *statistics.StoreLoad) int { return -cmp(ld1, ld2) } } func sliceLoadCmp(cmps ...storeLoadCmp) storeLoadCmp { - return func(ld1, ld2 *storeLoad) int { + return func(ld1, ld2 *statistics.StoreLoad) int { for _, cmp := range cmps { if r := cmp(ld1, ld2); r != 0 { return r @@ -283,8 +247,8 @@ func sliceLoadCmp(cmps ...storeLoadCmp) storeLoadCmp { } } -func stLdRankCmp(dim func(ld *storeLoad) float64, rank func(value float64) int64) storeLoadCmp { - return func(ld1, ld2 *storeLoad) int { +func stLdRankCmp(dim func(ld *statistics.StoreLoad) float64, rank func(value float64) int64) storeLoadCmp { + return func(ld1, ld2 *statistics.StoreLoad) int { return rankCmp(dim(ld1), dim(ld2), rank) } } @@ -299,49 +263,10 @@ func rankCmp(a, b float64, rank func(value float64) int64) int { return 0 } -// store load prediction -type storeLoadPred struct { - Current storeLoad - Future storeLoad - Expect storeLoad -} - -func (lp *storeLoadPred) min() *storeLoad { - return minLoad(&lp.Current, &lp.Future) -} - -func (lp *storeLoadPred) max() *storeLoad { - return maxLoad(&lp.Current, &lp.Future) -} - -func (lp *storeLoadPred) pending() *storeLoad { - mx, mn := lp.max(), lp.min() - loads := make([]float64, len(mx.Loads)) - for i := range loads { - loads[i] = mx.Loads[i] - mn.Loads[i] - } - return &storeLoad{ - Loads: loads, - Count: 0, - } -} - -func (lp *storeLoadPred) diff() *storeLoad { - mx, mn := lp.max(), lp.min() - loads := make([]float64, len(mx.Loads)) - for i := range loads { - loads[i] = mx.Loads[i] - mn.Loads[i] - } - return &storeLoad{ - Loads: loads, - Count: mx.Count - mn.Count, - } -} - -type storeLPCmp func(lp1, lp2 *storeLoadPred) int +type storeLPCmp func(lp1, lp2 *statistics.StoreLoadPred) int func sliceLPCmp(cmps ...storeLPCmp) storeLPCmp { - return func(lp1, lp2 *storeLoadPred) int { + return func(lp1, lp2 *statistics.StoreLoadPred) int { for _, cmp := range cmps { if r := cmp(lp1, lp2); r != 0 { return r @@ -352,150 +277,20 @@ func sliceLPCmp(cmps ...storeLPCmp) storeLPCmp { } func minLPCmp(ldCmp storeLoadCmp) storeLPCmp { - return func(lp1, lp2 *storeLoadPred) int { - return ldCmp(lp1.min(), lp2.min()) + return func(lp1, lp2 *statistics.StoreLoadPred) int { + return ldCmp(lp1.Min(), lp2.Min()) } } func maxLPCmp(ldCmp storeLoadCmp) storeLPCmp { - return func(lp1, lp2 *storeLoadPred) int { - return ldCmp(lp1.max(), lp2.max()) + return func(lp1, lp2 *statistics.StoreLoadPred) int { + return ldCmp(lp1.Max(), lp2.Max()) } } func diffCmp(ldCmp storeLoadCmp) storeLPCmp { - return func(lp1, lp2 *storeLoadPred) int { - return ldCmp(lp1.diff(), lp2.diff()) - } -} - -func minLoad(a, b *storeLoad) *storeLoad { - loads := make([]float64, len(a.Loads)) - for i := range loads { - loads[i] = math.Min(a.Loads[i], b.Loads[i]) - } - return &storeLoad{ - Loads: loads, - Count: math.Min(a.Count, b.Count), - } -} - -func maxLoad(a, b *storeLoad) *storeLoad { - loads := make([]float64, len(a.Loads)) - for i := range loads { - loads[i] = math.Max(a.Loads[i], b.Loads[i]) - } - return &storeLoad{ - Loads: loads, - Count: math.Max(a.Count, b.Count), - } -} - -type storeSummaryInfo struct { - Store *core.StoreInfo - IsTiFlash bool - PendingSum *Influence -} - -func summaryStoreInfos(cluster opt.Cluster) map[uint64]*storeSummaryInfo { - stores := cluster.GetStores() - infos := make(map[uint64]*storeSummaryInfo, len(stores)) - for _, store := range stores { - info := &storeSummaryInfo{ - Store: store, - IsTiFlash: core.IsStoreContainLabel(store.GetMeta(), core.EngineKey, core.EngineTiFlash), - PendingSum: nil, - } - infos[store.GetID()] = info - } - return infos -} - -func (s *storeSummaryInfo) addInfluence(infl *Influence, w float64) { - if infl == nil || w == 0 { - return - } - if s.PendingSum == nil { - s.PendingSum = &Influence{ - Loads: make([]float64, len(infl.Loads)), - Count: 0, - } - } - for i, load := range infl.Loads { - s.PendingSum.Loads[i] += load * w - } - s.PendingSum.Count += infl.Count * w -} - -type storeLoadDetail struct { - Info *storeSummaryInfo - LoadPred *storeLoadPred - HotPeers []*statistics.HotPeerStat -} - -func (li *storeLoadDetail) getID() uint64 { - return li.Info.Store.GetID() -} - -func (li *storeLoadDetail) toHotPeersStat() *statistics.HotPeersStat { - totalLoads := make([]float64, statistics.RegionStatCount) - if len(li.HotPeers) == 0 { - return &statistics.HotPeersStat{ - TotalLoads: totalLoads, - TotalBytesRate: 0.0, - TotalKeysRate: 0.0, - TotalQueryRate: 0.0, - Count: 0, - Stats: make([]statistics.HotPeerStatShow, 0), - } - } - kind := statistics.Write - if li.HotPeers[0].Kind == statistics.Read { - kind = statistics.Read - } - - peers := make([]statistics.HotPeerStatShow, 0, len(li.HotPeers)) - for _, peer := range li.HotPeers { - if peer.HotDegree > 0 { - peers = append(peers, toHotPeerStatShow(peer, kind)) - for i := range totalLoads { - totalLoads[i] += peer.GetLoad(statistics.RegionStatKind(i)) - } - } - } - - b, k, q := getRegionStatKind(kind, statistics.ByteDim), getRegionStatKind(kind, statistics.KeyDim), getRegionStatKind(kind, statistics.QueryDim) - byteRate, keyRate, queryRate := totalLoads[b], totalLoads[k], totalLoads[q] - storeByteRate, storeKeyRate, storeQueryRate := li.LoadPred.Current.Loads[statistics.ByteDim], - li.LoadPred.Current.Loads[statistics.KeyDim], li.LoadPred.Current.Loads[statistics.QueryDim] - - return &statistics.HotPeersStat{ - TotalLoads: totalLoads, - TotalBytesRate: byteRate, - TotalKeysRate: keyRate, - TotalQueryRate: queryRate, - StoreByteRate: storeByteRate, - StoreKeyRate: storeKeyRate, - StoreQueryRate: storeQueryRate, - Count: len(peers), - Stats: peers, - } -} - -func toHotPeerStatShow(p *statistics.HotPeerStat, kind statistics.RWType) statistics.HotPeerStatShow { - b, k, q := getRegionStatKind(kind, statistics.ByteDim), getRegionStatKind(kind, statistics.KeyDim), getRegionStatKind(kind, statistics.QueryDim) - byteRate := p.Loads[b] - keyRate := p.Loads[k] - queryRate := p.Loads[q] - return statistics.HotPeerStatShow{ - StoreID: p.StoreID, - RegionID: p.RegionID, - HotDegree: p.HotDegree, - ByteRate: byteRate, - KeyRate: keyRate, - QueryRate: queryRate, - AntiCount: p.AntiCount, - LastUpdateTime: p.LastUpdateTime, + return func(lp1, lp2 *statistics.StoreLoadPred) int { + return ldCmp(lp1.Diff(), lp2.Diff()) } } @@ -504,7 +299,7 @@ type storeCollector interface { // Engine returns the type of Store. Engine() string // Filter determines whether the Store needs to be handled by itself. - Filter(info *storeSummaryInfo, kind core.ResourceKind) bool + Filter(info *statistics.StoreSummaryInfo, kind core.ResourceKind) bool // GetLoads obtains available loads from storeLoads and peerLoadSum according to rwTy and kind. GetLoads(storeLoads, peerLoadSum []float64, rwTy statistics.RWType, kind core.ResourceKind) (loads []float64) } @@ -519,7 +314,7 @@ func (c tikvCollector) Engine() string { return core.EngineTiKV } -func (c tikvCollector) Filter(info *storeSummaryInfo, kind core.ResourceKind) bool { +func (c tikvCollector) Filter(info *statistics.StoreSummaryInfo, kind core.ResourceKind) bool { if info.IsTiFlash { return false } @@ -570,7 +365,7 @@ func (c tiflashCollector) Engine() string { return core.EngineTiFlash } -func (c tiflashCollector) Filter(info *storeSummaryInfo, kind core.ResourceKind) bool { +func (c tiflashCollector) Filter(info *statistics.StoreSummaryInfo, kind core.ResourceKind) bool { switch kind { case core.LeaderKind: return false @@ -599,7 +394,7 @@ func (c tiflashCollector) GetLoads(storeLoads, peerLoadSum []float64, rwTy stati loads[statistics.ByteDim] = peerLoadSum[statistics.ByteDim] loads[statistics.KeyDim] = peerLoadSum[statistics.KeyDim] } - // The `write-peer` does not have `QueryDim` + // The `wite-peer` does not have `QueryDim` } } return @@ -608,15 +403,15 @@ func (c tiflashCollector) GetLoads(storeLoads, peerLoadSum []float64, rwTy stati // summaryStoresLoad Load information of all available stores. // it will filter the hot peer and calculate the current and future stat(rate,count) for each store func summaryStoresLoad( - storeInfos map[uint64]*storeSummaryInfo, + storeInfos map[uint64]*statistics.StoreSummaryInfo, storesLoads map[uint64][]float64, storeHotPeers map[uint64][]*statistics.HotPeerStat, isTraceRegionFlow bool, rwTy statistics.RWType, kind core.ResourceKind, -) map[uint64]*storeLoadDetail { +) map[uint64]*statistics.StoreLoadDetail { // loadDetail stores the storeID -> hotPeers stat and its current and future stat(rate,count) - loadDetail := make(map[uint64]*storeLoadDetail, len(storesLoads)) + loadDetail := make(map[uint64]*statistics.StoreLoadDetail, len(storesLoads)) tikvLoadDetail := summaryStoresLoadByEngine( storeInfos, @@ -634,20 +429,20 @@ func summaryStoresLoad( ) for _, detail := range append(tikvLoadDetail, tiflashLoadDetail...) { - loadDetail[detail.getID()] = detail + loadDetail[detail.GetID()] = detail } return loadDetail } func summaryStoresLoadByEngine( - storeInfos map[uint64]*storeSummaryInfo, + storeInfos map[uint64]*statistics.StoreSummaryInfo, storesLoads map[uint64][]float64, storeHotPeers map[uint64][]*statistics.HotPeerStat, rwTy statistics.RWType, kind core.ResourceKind, collector storeCollector, -) []*storeLoadDetail { - loadDetail := make([]*storeLoadDetail, 0, len(storeInfos)) +) []*statistics.StoreLoadDetail { + loadDetail := make([]*statistics.StoreLoadDetail, 0, len(storeInfos)) allStoreLoadSum := make([]float64, statistics.DimLen) allStoreCount := 0 allHotPeersCount := 0 @@ -667,7 +462,7 @@ func summaryStoresLoadByEngine( // HotLeaders consider `Write{Bytes,Keys}`, so when we schedule `writeLeader`, all peers are leader. for _, peer := range filterHotPeers(kind, storeHotPeers[id]) { for i := range peerLoadSum { - peerLoadSum[i] += peer.GetLoad(getRegionStatKind(rwTy, i)) + peerLoadSum[i] += peer.GetLoad(statistics.GetRegionStatKind(rwTy, i)) } hotPeers = append(hotPeers, peer.Clone()) } @@ -689,13 +484,13 @@ func summaryStoresLoadByEngine( allHotPeersCount += len(hotPeers) // Build store load prediction from current load and pending influence. - stLoadPred := (&storeLoad{ + stLoadPred := (&statistics.StoreLoad{ Loads: loads, Count: float64(len(hotPeers)), }).ToLoadPred(rwTy, info.PendingSum) // Construct store load info. - loadDetail = append(loadDetail, &storeLoadDetail{ + loadDetail = append(loadDetail, &statistics.StoreLoadDetail{ Info: info, LoadPred: stLoadPred, HotPeers: hotPeers, @@ -723,7 +518,7 @@ func summaryStoresLoadByEngine( ty = "exp-count-rate-" + rwTy.String() + "-" + kind.String() hotPeerSummary.WithLabelValues(ty, engine).Set(expectCount) } - expect := storeLoad{ + expect := statistics.StoreLoad{ Loads: expectLoads, Count: float64(allHotPeersCount) / float64(allStoreCount), } diff --git a/server/statistics/kind.go b/server/statistics/kind.go index e663d1ce56c..e5965fcbfcb 100644 --- a/server/statistics/kind.go +++ b/server/statistics/kind.go @@ -119,7 +119,7 @@ func (k sourceKind) String() string { } // RWType is a identify hot region types. -type RWType uint32 +type RWType int // Flags for r/w type. const ( diff --git a/server/statistics/store_load.go b/server/statistics/store_load.go new file mode 100644 index 00000000000..126665c9d23 --- /dev/null +++ b/server/statistics/store_load.go @@ -0,0 +1,258 @@ +// Copyright 2021 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +import ( + "math" + + "github.com/tikv/pd/server/core" +) + +// StoreLoadDetail records store load information. +type StoreLoadDetail struct { + Info *StoreSummaryInfo + LoadPred *StoreLoadPred + HotPeers []*HotPeerStat +} + +// GetID return the ID of store. +func (li *StoreLoadDetail) GetID() uint64 { + return li.Info.Store.GetID() +} + +// ToHotPeersStat abstracts load information to HotPeersStat. +func (li *StoreLoadDetail) ToHotPeersStat() *HotPeersStat { + totalLoads := make([]float64, RegionStatCount) + if len(li.HotPeers) == 0 { + return &HotPeersStat{ + TotalLoads: totalLoads, + TotalBytesRate: 0.0, + TotalKeysRate: 0.0, + TotalQueryRate: 0.0, + Count: 0, + Stats: make([]HotPeerStatShow, 0), + } + } + kind := Write + if li.HotPeers[0].Kind == Read { + kind = Read + } + + peers := make([]HotPeerStatShow, 0, len(li.HotPeers)) + for _, peer := range li.HotPeers { + if peer.HotDegree > 0 { + peers = append(peers, toHotPeerStatShow(peer, kind)) + for i := range totalLoads { + totalLoads[i] += peer.GetLoad(RegionStatKind(i)) + } + } + } + + b, k, q := GetRegionStatKind(kind, ByteDim), GetRegionStatKind(kind, KeyDim), GetRegionStatKind(kind, QueryDim) + byteRate, keyRate, queryRate := totalLoads[b], totalLoads[k], totalLoads[q] + storeByteRate, storeKeyRate, storeQueryRate := li.LoadPred.Current.Loads[ByteDim], + li.LoadPred.Current.Loads[KeyDim], li.LoadPred.Current.Loads[QueryDim] + + return &HotPeersStat{ + TotalLoads: totalLoads, + TotalBytesRate: byteRate, + TotalKeysRate: keyRate, + TotalQueryRate: queryRate, + StoreByteRate: storeByteRate, + StoreKeyRate: storeKeyRate, + StoreQueryRate: storeQueryRate, + Count: len(peers), + Stats: peers, + } +} + +func toHotPeerStatShow(p *HotPeerStat, kind RWType) HotPeerStatShow { + b, k, q := GetRegionStatKind(kind, ByteDim), GetRegionStatKind(kind, KeyDim), GetRegionStatKind(kind, QueryDim) + byteRate := p.Loads[b] + keyRate := p.Loads[k] + queryRate := p.Loads[q] + return HotPeerStatShow{ + StoreID: p.StoreID, + RegionID: p.RegionID, + HotDegree: p.HotDegree, + ByteRate: byteRate, + KeyRate: keyRate, + QueryRate: queryRate, + AntiCount: p.AntiCount, + LastUpdateTime: p.LastUpdateTime, + } +} + +// GetRegionStatKind gets region statistics kind. +func GetRegionStatKind(rwTy RWType, dim int) RegionStatKind { + switch { + case rwTy == Read && dim == ByteDim: + return RegionReadBytes + case rwTy == Read && dim == KeyDim: + return RegionReadKeys + case rwTy == Write && dim == ByteDim: + return RegionWriteBytes + case rwTy == Write && dim == KeyDim: + return RegionWriteKeys + case rwTy == Write && dim == QueryDim: + return RegionWriteQuery + case rwTy == Read && dim == QueryDim: + return RegionReadQuery + } + return 0 +} + +// StoreSummaryInfo records the summary information of store. +type StoreSummaryInfo struct { + Store *core.StoreInfo + IsTiFlash bool + PendingSum *Influence +} + +// SummaryStoreInfos return a mapping from store to summary information. +func SummaryStoreInfos(cluster core.StoreSetInformer) map[uint64]*StoreSummaryInfo { + stores := cluster.GetStores() + infos := make(map[uint64]*StoreSummaryInfo, len(stores)) + for _, store := range stores { + info := &StoreSummaryInfo{ + Store: store, + IsTiFlash: core.IsStoreContainLabel(store.GetMeta(), core.EngineKey, core.EngineTiFlash), + PendingSum: nil, + } + infos[store.GetID()] = info + } + return infos +} + +// AddInfluence adds influence to pending sum. +func (s *StoreSummaryInfo) AddInfluence(infl *Influence, w float64) { + if infl == nil || w == 0 { + return + } + if s.PendingSum == nil { + s.PendingSum = &Influence{ + Loads: make([]float64, len(infl.Loads)), + Count: 0, + } + } + for i, load := range infl.Loads { + s.PendingSum.Loads[i] += load * w + } + s.PendingSum.Count += infl.Count * w +} + +// Influence records operator influence. +type Influence struct { + Loads []float64 + Count float64 +} + +// StoreLoad records the current load. +type StoreLoad struct { + Loads []float64 + Count float64 +} + +// ToLoadPred returns the current load and future predictive load. +func (load StoreLoad) ToLoadPred(rwTy RWType, infl *Influence) *StoreLoadPred { + future := StoreLoad{ + Loads: append(load.Loads[:0:0], load.Loads...), + Count: load.Count, + } + if infl != nil { + switch rwTy { + case Read: + future.Loads[ByteDim] += infl.Loads[RegionReadBytes] + future.Loads[KeyDim] += infl.Loads[RegionReadKeys] + future.Loads[QueryDim] += infl.Loads[RegionReadQuery] + case Write: + future.Loads[ByteDim] += infl.Loads[RegionWriteBytes] + future.Loads[KeyDim] += infl.Loads[RegionWriteKeys] + future.Loads[QueryDim] += infl.Loads[RegionWriteQuery] + } + future.Count += infl.Count + } + return &StoreLoadPred{ + Current: load, + Future: future, + } +} + +// StoreLoadPred is a prediction of a store. +type StoreLoadPred struct { + Current StoreLoad + Future StoreLoad + Expect StoreLoad +} + +// Min returns the min load between current and future. +func (lp *StoreLoadPred) Min() *StoreLoad { + return MinLoad(&lp.Current, &lp.Future) +} + +// Max returns the max load between current and future. +func (lp *StoreLoadPred) Max() *StoreLoad { + return MaxLoad(&lp.Current, &lp.Future) +} + +// Pending returns the pending load. +func (lp *StoreLoadPred) Pending() *StoreLoad { + mx, mn := lp.Max(), lp.Min() + loads := make([]float64, len(mx.Loads)) + for i := range loads { + loads[i] = mx.Loads[i] - mn.Loads[i] + } + return &StoreLoad{ + Loads: loads, + Count: 0, + } +} + +// Diff return the difference between min and max. +func (lp *StoreLoadPred) Diff() *StoreLoad { + mx, mn := lp.Max(), lp.Min() + loads := make([]float64, len(mx.Loads)) + for i := range loads { + loads[i] = mx.Loads[i] - mn.Loads[i] + } + return &StoreLoad{ + Loads: loads, + Count: mx.Count - mn.Count, + } +} + +// MinLoad return the min store load. +func MinLoad(a, b *StoreLoad) *StoreLoad { + loads := make([]float64, len(a.Loads)) + for i := range loads { + loads[i] = math.Min(a.Loads[i], b.Loads[i]) + } + return &StoreLoad{ + Loads: loads, + Count: math.Min(a.Count, b.Count), + } +} + +// MaxLoad return the max store load. +func MaxLoad(a, b *StoreLoad) *StoreLoad { + loads := make([]float64, len(a.Loads)) + for i := range loads { + loads[i] = math.Max(a.Loads[i], b.Loads[i]) + } + return &StoreLoad{ + Loads: loads, + Count: math.Max(a.Count, b.Count), + } +}