Skip to content

Commit

Permalink
This is an automated cherry-pick of #3949
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
HunDunDM authored and ti-chi-bot committed Aug 11, 2021
1 parent 0c1246d commit 5272137
Show file tree
Hide file tree
Showing 4 changed files with 1,747 additions and 11 deletions.
131 changes: 130 additions & 1 deletion server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope
h.Lock()
defer h.Unlock()

h.prepareForBalance(cluster)
h.prepareForBalance(typ, cluster)

switch typ {
case read:
Expand All @@ -186,13 +186,25 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope
return nil
}

<<<<<<< HEAD
func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
=======
// prepareForBalance calculate the summary of pending Influence for each store and prepare the load detail for
// each store
func (h *hotScheduler) prepareForBalance(typ rwType, cluster opt.Cluster) {
>>>>>>> 22f23fe0c (scheduler: fix inaccurate statistics and config (#3949))
h.summaryPendingInfluence()

storesStat := cluster.GetStoresStats()

<<<<<<< HEAD
minHotDegree := cluster.GetHotRegionCacheHitsThreshold()
{ // update read statistics
=======
switch typ {
case read:
// update read statistics
>>>>>>> 22f23fe0c (scheduler: fix inaccurate statistics and config (#3949))
regionRead := cluster.RegionReadStats()
storeByte := storesStat.GetStoresBytesReadStat()
storeKey := storesStat.GetStoresKeysReadStat()
Expand All @@ -202,12 +214,18 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
storeKey,
h.pendingSums[readLeader],
regionRead,
<<<<<<< HEAD
minHotDegree,
hotRegionThreshold,
read, core.LeaderKind, mixed)
}

{ // update write statistics
=======
read, core.RegionKind)
case write:
// update write statistics
>>>>>>> 22f23fe0c (scheduler: fix inaccurate statistics and config (#3949))
regionWrite := cluster.RegionWriteStats()
storeByte := storesStat.GetStoresBytesWriteStat()
storeKey := storesStat.GetStoresKeysWriteStat()
Expand All @@ -232,13 +250,33 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
}
}

<<<<<<< HEAD
func getHotRegionThreshold(stats *statistics.StoresStats, typ rwType) [2]uint64 {
var hotRegionThreshold [2]uint64
switch typ {
case write:
hotRegionThreshold[0] = uint64(stats.TotalBytesWriteRate() / divisor)
if hotRegionThreshold[0] < hotWriteRegionMinFlowRate {
hotRegionThreshold[0] = hotWriteRegionMinFlowRate
=======
// summaryPendingInfluence calculate the summary of pending Influence for each store
// and clean the region from regionInfluence if they have ended operator.
// It makes each dim rate or count become `weight` times to the origin value.
func (h *hotScheduler) summaryPendingInfluence() {
ret := make(map[uint64]*Influence)
for id, p := range h.regionPendings {
maxZombieDur := p.maxZombieDuration
weight, needGC := h.calcPendingInfluence(p.op, maxZombieDur)
if needGC {
delete(h.regionPendings, id)
schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Dec()
log.Debug("gc pending influence in hot region scheduler",
zap.Uint64("region-id", id),
zap.Time("create", p.op.GetCreateTime()),
zap.Time("now", time.Now()),
zap.Duration("zombie", maxZombieDur))
continue
>>>>>>> 22f23fe0c (scheduler: fix inaccurate statistics and config (#3949))
}
hotRegionThreshold[1] = uint64(stats.TotalKeysWriteRate() / divisor)
if hotRegionThreshold[1] < hotWriteRegionMinKeyRate {
Expand Down Expand Up @@ -301,6 +339,7 @@ func summaryStoresLoad(
kind core.ResourceKind,
hotPeerFilterTy hotPeerFilterType,
) map[uint64]*storeLoadDetail {
<<<<<<< HEAD
loadDetail := make(map[uint64]*storeLoadDetail, len(storeByteRate))
allByteSum := 0.0
allKeySum := 0.0
Expand All @@ -310,6 +349,33 @@ func summaryStoresLoad(
for id, byteRate := range storeByteRate {
keyRate := storeKeyRate[id]

=======
// loadDetail stores the storeID -> hotPeers stat and its current and future stat(rate,count)
loadDetail := make(map[uint64]*storeLoadDetail, len(storesLoads))
allTiKVLoadSum := make([]float64, statistics.DimLen)
allTiKVCount := 0
allTiKVHotPeersCount := 0

for _, store := range stores {
id := store.GetID()
storeLoads, ok := storesLoads[id]
if !ok {
continue
}
isTiFlash := core.IsTiFlashStore(store.GetMeta())

loads := make([]float64, statistics.DimLen)
switch rwTy {
case read:
loads[statistics.ByteDim] = storeLoads[statistics.StoreReadBytes]
loads[statistics.KeyDim] = storeLoads[statistics.StoreReadKeys]
loads[statistics.QueryDim] = storeLoads[statistics.StoreReadQuery]
case write:
loads[statistics.ByteDim] = storeLoads[statistics.StoreWriteBytes]
loads[statistics.KeyDim] = storeLoads[statistics.StoreWriteKeys]
loads[statistics.QueryDim] = storeLoads[statistics.StoreWriteQuery]
}
>>>>>>> 22f23fe0c (scheduler: fix inaccurate statistics and config (#3949))
// Find all hot peers first
hotPeers := make([]*statistics.HotPeerStat, 0)
{
Expand All @@ -336,9 +402,20 @@ func summaryStoresLoad(
hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(keySum)
}
}
<<<<<<< HEAD
allByteSum += byteRate
allKeySum += keyRate
allCount += float64(len(hotPeers))
=======

if !isTiFlash {
for i := range allTiKVLoadSum {
allTiKVLoadSum[i] += loads[i]
}
allTiKVCount += 1
allTiKVHotPeersCount += len(hotPeers)
}
>>>>>>> 22f23fe0c (scheduler: fix inaccurate statistics and config (#3949))

// Build store load prediction from current load and pending influence.
stLoadPred := (&storeLoad{
Expand All @@ -353,6 +430,7 @@ func summaryStoresLoad(
HotPeers: hotPeers,
}
}
<<<<<<< HEAD
storeLen := float64(len(storeByteRate))

for id, detail := range loadDetail {
Expand All @@ -362,6 +440,21 @@ func summaryStoresLoad(
detail.LoadPred.Future.ExpByteRate = byteExp
detail.LoadPred.Future.ExpKeyRate = keyExp
detail.LoadPred.Future.ExpCount = countExp
=======

// store expectation rate and count for each store-load detail.
for id, detail := range loadDetail {
var allLoadSum = allTiKVLoadSum
var allStoreCount = float64(allTiKVCount)
var allHotPeersCount = float64(allTiKVHotPeersCount)
expectLoads := make([]float64, len(allLoadSum))
for i := range expectLoads {
expectLoads[i] = allLoadSum[i] / allStoreCount
}
expectCount := allHotPeersCount / allStoreCount
detail.LoadPred.Expect.Loads = expectLoads
detail.LoadPred.Expect.Count = expectCount
>>>>>>> 22f23fe0c (scheduler: fix inaccurate statistics and config (#3949))
// Debug
{
ty := "exp-byte-rate-" + rwTy.String() + "-" + kind.String()
Expand Down Expand Up @@ -398,6 +491,7 @@ func filterHotPeers(
return ret
}

<<<<<<< HEAD
func isHotPeerFiltered(peer *statistics.HotPeerStat, hotRegionThreshold [2]uint64, hotPeerFilterTy hotPeerFilterType) bool {
var isFiltered bool
switch hotPeerFilterTy {
Expand All @@ -415,16 +509,24 @@ func isHotPeerFiltered(peer *statistics.HotPeerStat, hotRegionThreshold [2]uint6
}

func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence, rwTy rwType, opTy opType) bool {
=======
func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence, maxZombieDur time.Duration) bool {
>>>>>>> 22f23fe0c (scheduler: fix inaccurate statistics and config (#3949))
regionID := op.RegionID()
_, ok := h.regionPendings[regionID]
if ok {
schedulerStatus.WithLabelValues(h.GetName(), "pending_op_fails").Inc()
return false
}

<<<<<<< HEAD
influence := newPendingInfluence(op, srcStore, dstStore, infl)
rcTy := toResourceType(rwTy, opTy)
h.pendings[rcTy][influence] = struct{}{}
=======
influence := newPendingInfluence(op, srcStore, dstStore, infl, maxZombieDur)
h.regionPendings[regionID] = influence
>>>>>>> 22f23fe0c (scheduler: fix inaccurate statistics and config (#3949))

h.regionPendings[regionID] = [2]*operator.Operator{nil, nil}
{ // h.pendingOpInfos[regionID][ty] = influence
Expand Down Expand Up @@ -599,11 +701,30 @@ func (bs *balanceSolver) solve() []*operator.Operator {
}
}

<<<<<<< HEAD
for i := 0; i < len(ops); i++ {
// TODO: multiple operators need to be atomic.
if !bs.sche.addPendingInfluence(ops[i], best.srcStoreID, best.dstStoreID, infls[i], bs.rwTy, bs.opTy) {
return nil
}
=======
if best == nil {
return nil
}

// Depending on the source of the statistics used, a different ZombieDuration will be used.
// If the statistics are from the sum of Regions, there will be a longer ZombieDuration.
var maxZombieDur time.Duration
switch {
case bs.rwTy == write && bs.opTy == transferLeader:
maxZombieDur = bs.sche.conf.GetRegionsStatZombieDuration()
default:
maxZombieDur = bs.sche.conf.GetStoreStatZombieDuration()
}

if !bs.sche.addPendingInfluence(op, best.srcStoreID, best.dstStoreID, infl, maxZombieDur) {
return nil
>>>>>>> 22f23fe0c (scheduler: fix inaccurate statistics and config (#3949))
}
return ops
}
Expand Down Expand Up @@ -816,8 +937,16 @@ func (bs *balanceSolver) calcProgressiveRank() {
rank := int64(0)
if bs.rwTy == write && bs.opTy == transferLeader {
// In this condition, CPU usage is the matter.
<<<<<<< HEAD
// Only consider about key rate.
if srcLd.KeyRate >= dstLd.KeyRate+peer.GetKeyRate() {
=======
// Only consider key rate or query rate.
srcRate := srcLd.Loads[bs.writeLeaderFirstPriority]
dstRate := dstLd.Loads[bs.writeLeaderFirstPriority]
peerRate := peer.GetLoad(getRegionStatKind(bs.rwTy, bs.writeLeaderFirstPriority))
if srcRate-peerRate >= dstRate+peerRate {
>>>>>>> 22f23fe0c (scheduler: fix inaccurate statistics and config (#3949))
rank = -1
}
} else {
Expand Down
29 changes: 27 additions & 2 deletions server/schedulers/hot_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
// params about hot region.
func initHotRegionScheduleConfig() *hotRegionSchedulerConfig {
return &hotRegionSchedulerConfig{
<<<<<<< HEAD
MinHotByteRate: 100,
MinHotKeyRate: 10,
MaxZombieRounds: 3,
Expand All @@ -44,6 +45,25 @@ func initHotRegionScheduleConfig() *hotRegionSchedulerConfig {
MaxPeerNum: 1000,
SrcToleranceRatio: 1.05, // Tolerate 5% difference
DstToleranceRatio: 1.05, // Tolerate 5% difference
=======
MinHotByteRate: 100,
MinHotKeyRate: 10,
MinHotQueryRate: 10,
MaxZombieRounds: 3,
MaxPeerNum: 1000,
ByteRateRankStepRatio: 0.05,
KeyRateRankStepRatio: 0.05,
QueryRateRankStepRatio: 0.05,
CountRankStepRatio: 0.01,
GreatDecRatio: 0.95,
MinorDecRatio: 0.99,
SrcToleranceRatio: 1.05, // Tolerate 5% difference
DstToleranceRatio: 1.05, // Tolerate 5% difference
ReadPriorities: []string{QueryPriority, BytePriority},
WriteLeaderPriorities: []string{KeyPriority, BytePriority},
WritePeerPriorities: []string{BytePriority, KeyPriority},
StrictPickingStore: true,
>>>>>>> 22f23fe0c (scheduler: fix inaccurate statistics and config (#3949))
}
}

Expand Down Expand Up @@ -73,12 +93,18 @@ func (conf *hotRegionSchedulerConfig) EncodeConfig() ([]byte, error) {
return schedule.EncodeConfig(conf)
}

func (conf *hotRegionSchedulerConfig) GetMaxZombieDuration() time.Duration {
func (conf *hotRegionSchedulerConfig) GetStoreStatZombieDuration() time.Duration {
conf.RLock()
defer conf.RUnlock()
return time.Duration(conf.MaxZombieRounds) * statistics.StoreHeartBeatReportInterval * time.Second
}

func (conf *hotRegionSchedulerConfig) GetRegionsStatZombieDuration() time.Duration {
conf.RLock()
defer conf.RUnlock()
return time.Duration(conf.MaxZombieRounds) * statistics.RegionHeartBeatReportInterval * time.Second
}

func (conf *hotRegionSchedulerConfig) GetMaxPeerNumber() int {
conf.RLock()
defer conf.RUnlock()
Expand Down Expand Up @@ -214,5 +240,4 @@ func (conf *hotRegionSchedulerConfig) persist() error {

}
return conf.storage.SaveScheduleConfig(HotRegionName, data)

}
Loading

0 comments on commit 5272137

Please sign in to comment.