Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: fix inaccurate statistics and config #3949

Merged
merged 3 commits into from
Aug 4, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 44 additions & 19 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope
h.Lock()
defer h.Unlock()

h.prepareForBalance(cluster)
h.prepareForBalance(typ, cluster)

switch typ {
case read:
Expand All @@ -154,13 +154,15 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope

// prepareForBalance calculate the summary of pending Influence for each store and prepare the load detail for
// each store
func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
func (h *hotScheduler) prepareForBalance(typ rwType, cluster opt.Cluster) {
h.summaryPendingInfluence()

stores := cluster.GetStores()
storesLoads := cluster.GetStoresLoads()

{ // update read statistics
switch typ {
case read:
// update read statistics
regionRead := cluster.RegionReadStats()
h.stLoadInfos[readLeader] = summaryStoresLoad(
stores,
Expand All @@ -174,9 +176,8 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
h.pendingSums,
regionRead,
read, core.RegionKind)
}

{ // update write statistics
case write:
// update write statistics
regionWrite := cluster.RegionWriteStats()
h.stLoadInfos[writeLeader] = summaryStoresLoad(
stores,
Expand All @@ -198,9 +199,9 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
// and clean the region from regionInfluence if they have ended operator.
// It makes each dim rate or count become `weight` times to the origin value.
func (h *hotScheduler) summaryPendingInfluence() {
maxZombieDur := h.conf.GetMaxZombieDuration()
ret := make(map[uint64]*Influence)
for id, p := range h.regionPendings {
maxZombieDur := p.maxZombieDuration
weight, needGC := h.calcPendingInfluence(p.op, maxZombieDur)
if needGC {
delete(h.regionPendings, id)
Expand Down Expand Up @@ -236,15 +237,18 @@ func summaryStoresLoad(
) map[uint64]*storeLoadDetail {
// loadDetail stores the storeID -> hotPeers stat and its current and future stat(rate,count)
loadDetail := make(map[uint64]*storeLoadDetail, len(storesLoads))
allLoadSum := make([]float64, statistics.DimLen)
allCount := 0.0
allTiKVLoadSum := make([]float64, statistics.DimLen)
allTiKVCount := 0
allTiKVHotPeersCount := 0

for _, store := range stores {
id := store.GetID()
storeLoads, ok := storesLoads[id]
if !ok {
continue
}
isTiFlash := core.IsTiFlashStore(store.GetMeta())

loads := make([]float64, statistics.DimLen)
switch rwTy {
case read:
Expand Down Expand Up @@ -291,10 +295,14 @@ func summaryStoresLoad(
hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(peerLoadSum[statistics.QueryDim])
}
}
for i := range allLoadSum {
allLoadSum[i] += loads[i]

if !isTiFlash {
for i := range allTiKVLoadSum {
allTiKVLoadSum[i] += loads[i]
}
allTiKVCount += 1
allTiKVHotPeersCount += len(hotPeers)
}
allCount += float64(len(hotPeers))

// Build store load prediction from current load and pending influence.
stLoadPred := (&storeLoad{
Expand All @@ -309,14 +317,17 @@ func summaryStoresLoad(
HotPeers: hotPeers,
}
}
storeLen := float64(len(storesLoads))

// store expectation rate and count for each store-load detail.
for id, detail := range loadDetail {
var allLoadSum = allTiKVLoadSum
var allStoreCount = float64(allTiKVCount)
var allHotPeersCount = float64(allTiKVHotPeersCount)
expectLoads := make([]float64, len(allLoadSum))
for i := range expectLoads {
expectLoads[i] = allLoadSum[i] / storeLen
expectLoads[i] = allLoadSum[i] / allStoreCount
}
expectCount := allCount / storeLen
expectCount := allHotPeersCount / allStoreCount
detail.LoadPred.Expect.Loads = expectLoads
detail.LoadPred.Expect.Count = expectCount
// Debug
Expand Down Expand Up @@ -355,15 +366,15 @@ func filterHotPeers(
return ret
}

func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence) bool {
func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence, maxZombieDur time.Duration) bool {
regionID := op.RegionID()
_, ok := h.regionPendings[regionID]
if ok {
schedulerStatus.WithLabelValues(h.GetName(), "pending_op_fails").Inc()
return false
}

influence := newPendingInfluence(op, srcStore, dstStore, infl)
influence := newPendingInfluence(op, srcStore, dstStore, infl, maxZombieDur)
h.regionPendings[regionID] = influence

schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Inc()
Expand Down Expand Up @@ -567,7 +578,21 @@ func (bs *balanceSolver) solve() []*operator.Operator {
}
}

if best == nil || !bs.sche.addPendingInfluence(op, best.srcStoreID, best.dstStoreID, infl) {
if best == nil {
return nil
}

// Depending on the source of the statistics used, a different ZombieDuration will be used.
// If the statistics are from the sum of Regions, there will be a longer ZombieDuration.
var maxZombieDur time.Duration
switch {
case bs.rwTy == write && bs.opTy == transferLeader:
HunDunDM marked this conversation as resolved.
Show resolved Hide resolved
maxZombieDur = bs.sche.conf.GetRegionsStatZombieDuration()
default:
maxZombieDur = bs.sche.conf.GetStoreStatZombieDuration()
}

if !bs.sche.addPendingInfluence(op, best.srcStoreID, best.dstStoreID, infl, maxZombieDur) {
return nil
}

Expand Down Expand Up @@ -809,7 +834,7 @@ func (bs *balanceSolver) calcProgressiveRank() {
rank := int64(0)
if bs.rwTy == write && bs.opTy == transferLeader {
// In this condition, CPU usage is the matter.
// Only consider about key rate or query rate.
// Only consider key rate or query rate.
srcRate := srcLd.Loads[bs.writeLeaderFirstPriority]
dstRate := dstLd.Loads[bs.writeLeaderFirstPriority]
peerRate := peer.GetLoad(getRegionStatKind(bs.rwTy, bs.writeLeaderFirstPriority))
Expand Down
11 changes: 8 additions & 3 deletions server/schedulers/hot_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ func initHotRegionScheduleConfig() *hotRegionSchedulerConfig {
MinHotKeyRate: 10,
MinHotQueryRate: 10,
MaxZombieRounds: 3,
MaxPeerNum: 1000,
ByteRateRankStepRatio: 0.05,
KeyRateRankStepRatio: 0.05,
QueryRateRankStepRatio: 0.05,
CountRankStepRatio: 0.01,
GreatDecRatio: 0.95,
MinorDecRatio: 0.99,
MaxPeerNum: 1000,
SrcToleranceRatio: 1.05, // Tolerate 5% difference
DstToleranceRatio: 1.05, // Tolerate 5% difference
ReadPriorities: []string{QueryPriority, BytePriority},
Expand Down Expand Up @@ -94,12 +94,18 @@ func (conf *hotRegionSchedulerConfig) EncodeConfig() ([]byte, error) {
return schedule.EncodeConfig(conf)
}

func (conf *hotRegionSchedulerConfig) GetMaxZombieDuration() time.Duration {
func (conf *hotRegionSchedulerConfig) GetStoreStatZombieDuration() time.Duration {
conf.RLock()
defer conf.RUnlock()
return time.Duration(conf.MaxZombieRounds) * statistics.StoreHeartBeatReportInterval * time.Second
}

func (conf *hotRegionSchedulerConfig) GetRegionsStatZombieDuration() time.Duration {
conf.RLock()
defer conf.RUnlock()
return time.Duration(conf.MaxZombieRounds) * statistics.RegionHeartBeatReportInterval * time.Second
}

func (conf *hotRegionSchedulerConfig) GetMaxPeerNumber() int {
conf.RLock()
defer conf.RUnlock()
Expand Down Expand Up @@ -247,5 +253,4 @@ func (conf *hotRegionSchedulerConfig) persist() error {

}
return conf.storage.SaveScheduleConfig(HotRegionName, data)

}
4 changes: 2 additions & 2 deletions server/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) {
op.Start()
operator.SetOperatorStatusReachTime(op, operator.CREATED, time.Now().Add(-5*statistics.StoreHeartBeatReportInterval*time.Second))
operator.SetOperatorStatusReachTime(op, operator.STARTED, time.Now().Add((-5*statistics.StoreHeartBeatReportInterval+1)*time.Second))
return newPendingInfluence(op, 2, 4, Influence{})
return newPendingInfluence(op, 2, 4, Influence{}, hb.conf.GetStoreStatZombieDuration())
}
justDoneOpInfluence := func(region *core.RegionInfo, ty opType) *pendingInfluence {
infl := notDoneOpInfluence(region, ty)
Expand Down Expand Up @@ -1231,7 +1231,7 @@ func (s *testHotCacheSuite) TestCheckRegionFlow(c *C) {

if testcase.DegreeAfterTransferLeader >= 3 {
// try schedule
hb.prepareForBalance(tc)
hb.prepareForBalance(testcase.kind, tc)
leaderSolver := newBalanceSolver(hb, tc, testcase.kind, transferLeader)
leaderSolver.cur = &solution{srcStoreID: 2}
c.Check(leaderSolver.filterHotPeers(), HasLen, 0) // skip schedule
Expand Down
19 changes: 11 additions & 8 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"math"
"net/url"
"strconv"
"time"

"github.com/montanaflynn/stats"
"github.com/pingcap/log"
Expand Down Expand Up @@ -220,17 +221,19 @@ func (lhs *Influence) add(rhs *Influence, w float64) *Influence {

// TODO: merge it into OperatorInfluence.
type pendingInfluence struct {
op *operator.Operator
from, to uint64
origin Influence
op *operator.Operator
from, to uint64
origin Influence
maxZombieDuration time.Duration
}

func newPendingInfluence(op *operator.Operator, from, to uint64, infl Influence) *pendingInfluence {
func newPendingInfluence(op *operator.Operator, from, to uint64, infl Influence, maxZombieDur time.Duration) *pendingInfluence {
return &pendingInfluence{
op: op,
from: from,
to: to,
origin: infl,
op: op,
from: from,
to: to,
origin: infl,
maxZombieDuration: maxZombieDur,
}
}

Expand Down