diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index b35de5048220..3aa07300238e 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -190,7 +190,7 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope h.Lock() defer h.Unlock() - h.prepareForBalance(cluster) + h.prepareForBalance(typ, cluster) switch typ { case read: @@ -203,21 +203,34 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope // prepareForBalance calculate the summary of pending Influence for each store and prepare the load detail for // each store -func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) { +func (h *hotScheduler) prepareForBalance(typ rwType, cluster opt.Cluster) { h.summaryPendingInfluence() storesLoads := cluster.GetStoresLoads() - { // update read statistics + switch typ { + case read: + // update read statistics regionRead := cluster.RegionReadStats() h.stLoadInfos[readLeader] = summaryStoresLoad( storesLoads, h.pendingSums[readLeader], regionRead, read, core.LeaderKind) +<<<<<<< HEAD } { // update write statistics +======= + h.stLoadInfos[readPeer] = summaryStoresLoad( + stores, + storesLoads, + h.pendingSums, + regionRead, + read, core.RegionKind) + case write: + // update write statistics +>>>>>>> 22f23fe0c (scheduler: fix inaccurate statistics and config (#3949)) regionWrite := cluster.RegionWriteStats() h.stLoadInfos[writeLeader] = summaryStoresLoad( storesLoads, @@ -236,6 +249,7 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) { // summaryPendingInfluence calculate the summary of pending Influence for each store // and clean the region from regionInfluence if they have ended operator. func (h *hotScheduler) summaryPendingInfluence() { +<<<<<<< HEAD for ty := resourceType(0); ty < resourceTypeLen; ty++ { h.pendingSums[ty] = summaryPendingInfluence(h.pendings[ty], h.calcPendingWeight) } @@ -258,6 +272,24 @@ func (h *hotScheduler) gcRegionPendings() { if pendings[ty] != nil { empty = false } +======= + ret := make(map[uint64]*Influence) + for id, p := range h.regionPendings { + maxZombieDur := p.maxZombieDuration + weight, needGC := h.calcPendingInfluence(p.op, maxZombieDur) + if needGC { + delete(h.regionPendings, id) + schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Dec() + log.Debug("gc pending influence in hot region scheduler", + zap.Uint64("region-id", id), + zap.Time("create", p.op.GetCreateTime()), + zap.Time("now", time.Now()), + zap.Duration("zombie", maxZombieDur)) + continue + } + if _, ok := ret[p.to]; !ok { + ret[p.to] = &Influence{Loads: make([]float64, len(p.origin.Loads))} +>>>>>>> 22f23fe0c (scheduler: fix inaccurate statistics and config (#3949)) } if empty { delete(h.regionPendings, regionID) @@ -278,12 +310,28 @@ func summaryStoresLoad( ) map[uint64]*storeLoadDetail { // loadDetail stores the storeID -> hotPeers stat and its current and future stat(key/byte rate,count) loadDetail := make(map[uint64]*storeLoadDetail, len(storesLoads)) +<<<<<<< HEAD allByteSum := 0.0 allKeySum := 0.0 allCount := 0.0 for id, loads := range storesLoads { var byteRate, keyRate float64 +======= + allTiKVLoadSum := make([]float64, statistics.DimLen) + allTiKVCount := 0 + allTiKVHotPeersCount := 0 + + for _, store := range stores { + id := store.GetID() + storeLoads, ok := storesLoads[id] + if !ok { + continue + } + isTiFlash := core.IsTiFlashStore(store.GetMeta()) + + loads := make([]float64, statistics.DimLen) +>>>>>>> 22f23fe0c (scheduler: fix inaccurate statistics and config (#3949)) switch rwTy { case read: byteRate, keyRate = loads[statistics.StoreReadBytes], loads[statistics.StoreReadKeys] @@ -319,9 +367,20 @@ func summaryStoresLoad( hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(keySum) } } +<<<<<<< HEAD allByteSum += byteRate allKeySum += keyRate allCount += float64(len(hotPeers)) +======= + + if !isTiFlash { + for i := range allTiKVLoadSum { + allTiKVLoadSum[i] += loads[i] + } + allTiKVCount += 1 + allTiKVHotPeersCount += len(hotPeers) + } +>>>>>>> 22f23fe0c (scheduler: fix inaccurate statistics and config (#3949)) // Build store load prediction from current load and pending influence. stLoadPred := (&storeLoad{ @@ -337,6 +396,7 @@ func summaryStoresLoad( } } +<<<<<<< HEAD storeLen := float64(len(storesLoads)) // store expectation byte/key rate and count for each store-load detail. for id, detail := range loadDetail { @@ -346,6 +406,20 @@ func summaryStoresLoad( detail.LoadPred.Expect.ByteRate = byteExp detail.LoadPred.Expect.KeyRate = keyExp detail.LoadPred.Expect.Count = countExp +======= + // store expectation rate and count for each store-load detail. + for id, detail := range loadDetail { + var allLoadSum = allTiKVLoadSum + var allStoreCount = float64(allTiKVCount) + var allHotPeersCount = float64(allTiKVHotPeersCount) + expectLoads := make([]float64, len(allLoadSum)) + for i := range expectLoads { + expectLoads[i] = allLoadSum[i] / allStoreCount + } + expectCount := allHotPeersCount / allStoreCount + detail.LoadPred.Expect.Loads = expectLoads + detail.LoadPred.Expect.Count = expectCount +>>>>>>> 22f23fe0c (scheduler: fix inaccurate statistics and config (#3949)) // Debug { ty := "exp-byte-rate-" + rwTy.String() + "-" + kind.String() @@ -378,7 +452,11 @@ func filterHotPeers( return ret } +<<<<<<< HEAD func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence, rwTy rwType, opTy opType) bool { +======= +func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence, maxZombieDur time.Duration) bool { +>>>>>>> 22f23fe0c (scheduler: fix inaccurate statistics and config (#3949)) regionID := op.RegionID() _, ok := h.regionPendings[regionID] if ok { @@ -386,6 +464,7 @@ func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstS return false } +<<<<<<< HEAD influence := newPendingInfluence(op, srcStore, dstStore, infl) rcTy := toResourceType(rwTy, opTy) h.pendings[rcTy][influence] = struct{}{} @@ -396,6 +475,10 @@ func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstS tmp[opTy] = op h.regionPendings[regionID] = tmp } +======= + influence := newPendingInfluence(op, srcStore, dstStore, infl, maxZombieDur) + h.regionPendings[regionID] = influence +>>>>>>> 22f23fe0c (scheduler: fix inaccurate statistics and config (#3949)) schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Inc() return true @@ -565,11 +648,30 @@ func (bs *balanceSolver) solve() []*operator.Operator { } } +<<<<<<< HEAD for i := 0; i < len(ops); i++ { // TODO: multiple operators need to be atomic. if !bs.sche.addPendingInfluence(ops[i], best.srcStoreID, best.dstStoreID, infls[i], bs.rwTy, bs.opTy) { return nil } +======= + if best == nil { + return nil + } + + // Depending on the source of the statistics used, a different ZombieDuration will be used. + // If the statistics are from the sum of Regions, there will be a longer ZombieDuration. + var maxZombieDur time.Duration + switch { + case bs.rwTy == write && bs.opTy == transferLeader: + maxZombieDur = bs.sche.conf.GetRegionsStatZombieDuration() + default: + maxZombieDur = bs.sche.conf.GetStoreStatZombieDuration() + } + + if !bs.sche.addPendingInfluence(op, best.srcStoreID, best.dstStoreID, infl, maxZombieDur) { + return nil +>>>>>>> 22f23fe0c (scheduler: fix inaccurate statistics and config (#3949)) } return ops } @@ -795,8 +897,16 @@ func (bs *balanceSolver) calcProgressiveRank() { rank := int64(0) if bs.rwTy == write && bs.opTy == transferLeader { // In this condition, CPU usage is the matter. +<<<<<<< HEAD // Only consider about key rate. if srcLd.KeyRate-peer.GetKeyRate() >= dstLd.KeyRate+peer.GetKeyRate() { +======= + // Only consider key rate or query rate. + srcRate := srcLd.Loads[bs.writeLeaderFirstPriority] + dstRate := dstLd.Loads[bs.writeLeaderFirstPriority] + peerRate := peer.GetLoad(getRegionStatKind(bs.rwTy, bs.writeLeaderFirstPriority)) + if srcRate-peerRate >= dstRate+peerRate { +>>>>>>> 22f23fe0c (scheduler: fix inaccurate statistics and config (#3949)) rank = -1 } } else { diff --git a/server/schedulers/hot_region_config.go b/server/schedulers/hot_region_config.go index 42f4b14b57de..5fa75493f776 100644 --- a/server/schedulers/hot_region_config.go +++ b/server/schedulers/hot_region_config.go @@ -33,6 +33,7 @@ import ( // params about hot region. func initHotRegionScheduleConfig() *hotRegionSchedulerConfig { return &hotRegionSchedulerConfig{ +<<<<<<< HEAD MinHotByteRate: 100, MinHotKeyRate: 10, MaxZombieRounds: 3, @@ -44,6 +45,25 @@ func initHotRegionScheduleConfig() *hotRegionSchedulerConfig { MaxPeerNum: 1000, SrcToleranceRatio: 1.05, // Tolerate 5% difference DstToleranceRatio: 1.05, // Tolerate 5% difference +======= + MinHotByteRate: 100, + MinHotKeyRate: 10, + MinHotQueryRate: 10, + MaxZombieRounds: 3, + MaxPeerNum: 1000, + ByteRateRankStepRatio: 0.05, + KeyRateRankStepRatio: 0.05, + QueryRateRankStepRatio: 0.05, + CountRankStepRatio: 0.01, + GreatDecRatio: 0.95, + MinorDecRatio: 0.99, + SrcToleranceRatio: 1.05, // Tolerate 5% difference + DstToleranceRatio: 1.05, // Tolerate 5% difference + ReadPriorities: []string{QueryPriority, BytePriority}, + WriteLeaderPriorities: []string{KeyPriority, BytePriority}, + WritePeerPriorities: []string{BytePriority, KeyPriority}, + StrictPickingStore: true, +>>>>>>> 22f23fe0c (scheduler: fix inaccurate statistics and config (#3949)) } } @@ -73,12 +93,18 @@ func (conf *hotRegionSchedulerConfig) EncodeConfig() ([]byte, error) { return schedule.EncodeConfig(conf) } -func (conf *hotRegionSchedulerConfig) GetMaxZombieDuration() time.Duration { +func (conf *hotRegionSchedulerConfig) GetStoreStatZombieDuration() time.Duration { conf.RLock() defer conf.RUnlock() return time.Duration(conf.MaxZombieRounds) * statistics.StoreHeartBeatReportInterval * time.Second } +func (conf *hotRegionSchedulerConfig) GetRegionsStatZombieDuration() time.Duration { + conf.RLock() + defer conf.RUnlock() + return time.Duration(conf.MaxZombieRounds) * statistics.RegionHeartBeatReportInterval * time.Second +} + func (conf *hotRegionSchedulerConfig) GetMaxPeerNumber() int { conf.RLock() defer conf.RUnlock() @@ -214,5 +240,4 @@ func (conf *hotRegionSchedulerConfig) persist() error { } return conf.storage.SaveScheduleConfig(HotRegionName, data) - } diff --git a/server/schedulers/hot_test.go b/server/schedulers/hot_test.go index 909cfce314df..4a4599e7de18 100644 --- a/server/schedulers/hot_test.go +++ b/server/schedulers/hot_test.go @@ -70,7 +70,14 @@ func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) { } c.Assert(err, IsNil) c.Assert(op, NotNil) +<<<<<<< HEAD:server/schedulers/hot_test.go return op +======= + op.Start() + operator.SetOperatorStatusReachTime(op, operator.CREATED, time.Now().Add(-5*statistics.StoreHeartBeatReportInterval*time.Second)) + operator.SetOperatorStatusReachTime(op, operator.STARTED, time.Now().Add((-5*statistics.StoreHeartBeatReportInterval+1)*time.Second)) + return newPendingInfluence(op, 2, 4, Influence{}, hb.conf.GetStoreStatZombieDuration()) +>>>>>>> 22f23fe0c (scheduler: fix inaccurate statistics and config (#3949)):server/schedulers/hot_region_test.go } doneOp := func(region *core.RegionInfo, ty opType) *operator.Operator { op := notDoneOp(region, ty) @@ -1118,6 +1125,7 @@ func (s *testHotCacheSuite) checkRegionFlowTest(c *C, tc *mockcluster.Cluster, h } } +<<<<<<< HEAD:server/schedulers/hot_test.go // try schedule hb.prepareForBalance(tc) leaderSolver := newBalanceSolver(hb, tc, kind, transferLeader) @@ -1127,6 +1135,19 @@ func (s *testHotCacheSuite) checkRegionFlowTest(c *C, tc *mockcluster.Cluster, h tc.SetHotRegionCacheHitsThreshold(0) c.Check(leaderSolver.filterHotPeers(), HasLen, 1) tc.SetHotRegionCacheHitsThreshold(threshold) +======= + if testcase.DegreeAfterTransferLeader >= 3 { + // try schedule + hb.prepareForBalance(testcase.kind, tc) + leaderSolver := newBalanceSolver(hb, tc, testcase.kind, transferLeader) + leaderSolver.cur = &solution{srcStoreID: 2} + c.Check(leaderSolver.filterHotPeers(), HasLen, 0) // skip schedule + threshold := tc.GetHotRegionCacheHitsThreshold() + tc.SetHotRegionCacheHitsThreshold(0) + c.Check(leaderSolver.filterHotPeers(), HasLen, 1) + tc.SetHotRegionCacheHitsThreshold(threshold) + } +>>>>>>> 22f23fe0c (scheduler: fix inaccurate statistics and config (#3949)):server/schedulers/hot_region_test.go // move peer: add peer and remove peer items = heartbeat(1, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 3, 4}, 1) diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index af04b2678979..d6caba4b943c 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -17,6 +17,7 @@ import ( "math" "net/url" "strconv" + "time" "github.com/montanaflynn/stats" "github.com/pingcap/log" @@ -161,17 +162,19 @@ func (infl Influence) add(rhs *Influence, w float64) Influence { // TODO: merge it into OperatorInfluence. type pendingInfluence struct { - op *operator.Operator - from, to uint64 - origin Influence + op *operator.Operator + from, to uint64 + origin Influence + maxZombieDuration time.Duration } -func newPendingInfluence(op *operator.Operator, from, to uint64, infl Influence) *pendingInfluence { +func newPendingInfluence(op *operator.Operator, from, to uint64, infl Influence, maxZombieDur time.Duration) *pendingInfluence { return &pendingInfluence{ - op: op, - from: from, - to: to, - origin: infl, + op: op, + from: from, + to: to, + origin: infl, + maxZombieDuration: maxZombieDur, } }