Skip to content

Commit

Permalink
scheduler: fix inaccurate statistics and config (#3949)
Browse files Browse the repository at this point in the history
Signed-off-by: HunDunDM <hundundm@gmail.com>
  • Loading branch information
HunDunDM committed Sep 8, 2021
1 parent 5fa185c commit f2ea3be
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 33 deletions.
67 changes: 46 additions & 21 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope
h.Lock()
defer h.Unlock()

h.prepareForBalance(cluster)
h.prepareForBalance(typ, cluster)

switch typ {
case read:
Expand All @@ -185,13 +185,18 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope
return nil
}

func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
// prepareForBalance calculate the summary of pending Influence for each store and prepare the load detail for
// each store
func (h *hotScheduler) prepareForBalance(typ rwType, cluster opt.Cluster) {
h.summaryPendingInfluence()

stores := cluster.GetStores()
storesStat := cluster.GetStoresStats()
minHotDegree := cluster.GetHotRegionCacheHitsThreshold()
{ // update read statistics

switch typ {
case read:
// update read statistics
regionRead := cluster.RegionReadStats()
storeByte := storesStat.GetStoresBytesReadStat()
storeKey := storesStat.GetStoresKeysReadStat()
Expand All @@ -205,9 +210,8 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
minHotDegree,
hotRegionThreshold,
read, core.LeaderKind, mixed)
}

{ // update write statistics
case write:
// update write statistics
regionWrite := cluster.RegionWriteStats()
storeByte := storesStat.GetStoresBytesWriteStat()
storeKey := storesStat.GetStoresKeysWriteStat()
Expand Down Expand Up @@ -266,11 +270,11 @@ func getHotRegionThreshold(stats *statistics.StoresStats, typ rwType) [2]uint64
// and clean the region from regionInfluence if they have ended operator.
// It makes each dim rate or count become `weight` times to the origin value.
func (h *hotScheduler) summaryPendingInfluence() {
maxZombieDur := h.conf.GetMaxZombieDuration()
for ty := resourceType(0); ty < resourceTypeLen; ty++ {
ret := make(map[uint64]Influence)
pendings := h.pendings[ty]
for p := range pendings {
maxZombieDur := p.maxZombieDuration
weight, needGC := h.calcPendingInfluence(p.op, maxZombieDur)
if needGC {
id := p.op.RegionID()
Expand Down Expand Up @@ -305,9 +309,10 @@ func summaryStoresLoad(
hotPeerFilterTy hotPeerFilterType,
) map[uint64]*storeLoadDetail {
loadDetail := make(map[uint64]*storeLoadDetail, len(storeByteRate))
allByteSum := 0.0
allKeySum := 0.0
allCount := 0.0
allTiKVByteSum := 0.0
allTiKVKeySum := 0.0
allTiKVCount := 0
allTiKVHotPeersCount := 0

// Stores without byte rate statistics is not available to schedule.
for _, store := range stores {
Expand All @@ -320,6 +325,7 @@ func summaryStoresLoad(
continue
}
keyRate := storeKeyRate[id]
isTiFlash := core.IsTiFlashStore(store.GetMeta())

// Find all hot peers first
hotPeers := make([]*statistics.HotPeerStat, 0)
Expand Down Expand Up @@ -347,9 +353,13 @@ func summaryStoresLoad(
hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(keySum)
}
}
allByteSum += byteRate
allKeySum += keyRate
allCount += float64(len(hotPeers))

if !isTiFlash {
allTiKVByteSum += byteRate
allTiKVKeySum += keyRate
allTiKVCount += 1
allTiKVHotPeersCount += len(hotPeers)
}

// Build store load prediction from current load and pending influence.
stLoadPred := (&storeLoad{
Expand All @@ -365,12 +375,13 @@ func summaryStoresLoad(
HotPeers: hotPeers,
}
}
storeLen := float64(len(storeByteRate))

byteExp := allTiKVByteSum / float64(allTiKVCount)
keyExp := allTiKVKeySum / float64(allTiKVCount)
countExp := float64(allTiKVHotPeersCount) / float64(allTiKVCount)

// store expectation byte/key rate and count for each store-load detail.
for id, detail := range loadDetail {
byteExp := allByteSum / storeLen
keyExp := allKeySum / storeLen
countExp := allCount / storeLen
detail.LoadPred.Future.ExpByteRate = byteExp
detail.LoadPred.Future.ExpKeyRate = keyExp
detail.LoadPred.Future.ExpCount = countExp
Expand Down Expand Up @@ -426,15 +437,15 @@ func isHotPeerFiltered(peer *statistics.HotPeerStat, hotRegionThreshold [2]uint6
return isFiltered
}

func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence, rwTy rwType, opTy opType) bool {
func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence, rwTy rwType, opTy opType, maxZombieDur time.Duration) bool {
regionID := op.RegionID()
_, ok := h.regionPendings[regionID]
if ok {
schedulerStatus.WithLabelValues(h.GetName(), "pending_op_fails").Inc()
return false
}

influence := newPendingInfluence(op, srcStore, dstStore, infl)
influence := newPendingInfluence(op, srcStore, dstStore, infl, maxZombieDur)
rcTy := toResourceType(rwTy, opTy)
h.pendings[rcTy][influence] = struct{}{}
h.regionPendings[regionID] = op
Expand Down Expand Up @@ -605,7 +616,21 @@ func (bs *balanceSolver) solve() []*operator.Operator {
}
}

if best == nil || !bs.sche.addPendingInfluence(op, best.srcStoreID, best.dstStoreID, infl, bs.rwTy, bs.opTy) {
if best == nil {
return nil
}

// Depending on the source of the statistics used, a different ZombieDuration will be used.
// If the statistics are from the sum of Regions, there will be a longer ZombieDuration.
var maxZombieDur time.Duration
switch {
case bs.rwTy == write && bs.opTy == transferLeader:
maxZombieDur = bs.sche.conf.GetRegionsStatZombieDuration()
default:
maxZombieDur = bs.sche.conf.GetStoreStatZombieDuration()
}

if !bs.sche.addPendingInfluence(op, best.srcStoreID, best.dstStoreID, infl, bs.rwTy, bs.opTy, maxZombieDur) {
return nil
}

Expand Down Expand Up @@ -813,7 +838,7 @@ func (bs *balanceSolver) calcProgressiveRank() {
rank := int64(0)
if bs.rwTy == write && bs.opTy == transferLeader {
// In this condition, CPU usage is the matter.
// Only consider about key rate.
// Only consider key rate.
if srcLd.KeyRate >= dstLd.KeyRate+peer.GetKeyRate() {
rank = -1
}
Expand Down
11 changes: 8 additions & 3 deletions server/schedulers/hot_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ func initHotRegionScheduleConfig() *hotRegionSchedulerConfig {
MinHotByteRate: 100,
MinHotKeyRate: 10,
MaxZombieRounds: 3,
MaxPeerNum: 1000,
ByteRateRankStepRatio: 0.05,
KeyRateRankStepRatio: 0.05,
CountRankStepRatio: 0.01,
GreatDecRatio: 0.95,
MinorDecRatio: 0.99,
MaxPeerNum: 1000,
SrcToleranceRatio: 1.05, // Tolerate 5% difference
DstToleranceRatio: 1.05, // Tolerate 5% difference
}
Expand Down Expand Up @@ -73,12 +73,18 @@ func (conf *hotRegionSchedulerConfig) EncodeConfig() ([]byte, error) {
return schedule.EncodeConfig(conf)
}

func (conf *hotRegionSchedulerConfig) GetMaxZombieDuration() time.Duration {
func (conf *hotRegionSchedulerConfig) GetStoreStatZombieDuration() time.Duration {
conf.RLock()
defer conf.RUnlock()
return time.Duration(conf.MaxZombieRounds) * statistics.StoreHeartBeatReportInterval * time.Second
}

func (conf *hotRegionSchedulerConfig) GetRegionsStatZombieDuration() time.Duration {
conf.RLock()
defer conf.RUnlock()
return time.Duration(conf.MaxZombieRounds) * statistics.RegionHeartBeatReportInterval * time.Second
}

func (conf *hotRegionSchedulerConfig) GetMaxPeerNumber() int {
conf.RLock()
defer conf.RUnlock()
Expand Down Expand Up @@ -214,5 +220,4 @@ func (conf *hotRegionSchedulerConfig) persist() error {

}
return conf.storage.SaveScheduleConfig(HotRegionName, data)

}
2 changes: 1 addition & 1 deletion server/schedulers/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) {
regionID := uint64(i*len(typs) + j + 1)
region := newTestRegion(regionID)
op := creator(region, typ)
influence := newPendingInfluence(op, 2, 4, Influence{})
influence := newPendingInfluence(op, 2, 4, Influence{}, hb.conf.GetStoreStatZombieDuration())
hb.pendings[writePeer][influence] = struct{}{}
hb.regionPendings[regionID] = op
}
Expand Down
18 changes: 10 additions & 8 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,17 +176,19 @@ func (infl Influence) add(rhs *Influence, w float64) Influence {

// TODO: merge it into OperatorInfluence.
type pendingInfluence struct {
op *operator.Operator
from, to uint64
origin Influence
op *operator.Operator
from, to uint64
origin Influence
maxZombieDuration time.Duration
}

func newPendingInfluence(op *operator.Operator, from, to uint64, infl Influence) *pendingInfluence {
func newPendingInfluence(op *operator.Operator, from, to uint64, infl Influence, maxZombieDur time.Duration) *pendingInfluence {
return &pendingInfluence{
op: op,
from: from,
to: to,
origin: infl,
op: op,
from: from,
to: to,
origin: infl,
maxZombieDuration: maxZombieDur,
}
}

Expand Down

0 comments on commit f2ea3be

Please sign in to comment.