Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: fix inaccurate statistics and config (#3949) #3965

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 64 additions & 46 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,8 @@ type hotScheduler struct {
name string
*BaseScheduler
sync.RWMutex
leaderLimit uint64
peerLimit uint64
types []rwType
r *rand.Rand
types []rwType
r *rand.Rand

// regionPendings stores regionID -> pendingInfluence
// this records regionID which have pending Operator by operation type. During filterHotPeers, the hot peers won't
Expand All @@ -108,8 +106,6 @@ func newHotScheduler(opController *schedule.OperatorController, conf *hotRegionS
ret := &hotScheduler{
name: HotRegionName,
BaseScheduler: base,
leaderLimit: 1,
peerLimit: 1,
types: []rwType{write, read},
r: rand.New(rand.NewSource(time.Now().UnixNano())),
regionPendings: make(map[uint64]*pendingInfluence),
Expand Down Expand Up @@ -171,7 +167,7 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope
h.Lock()
defer h.Unlock()

h.prepareForBalance(cluster)
h.prepareForBalance(typ, cluster)

switch typ {
case read:
Expand All @@ -184,13 +180,15 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope

// prepareForBalance calculate the summary of pending Influence for each store and prepare the load detail for
// each store
func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
func (h *hotScheduler) prepareForBalance(typ rwType, cluster opt.Cluster) {
h.summaryPendingInfluence()

stores := cluster.GetStores()
storesLoads := cluster.GetStoresLoads()

{ // update read statistics
switch typ {
case read:
// update read statistics
regionRead := cluster.RegionReadStats()
h.stLoadInfos[readLeader] = summaryStoresLoad(
stores,
Expand All @@ -204,9 +202,8 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
h.pendingSums,
regionRead,
read, core.RegionKind)
}

{ // update write statistics
case write:
// update write statistics
regionWrite := cluster.RegionWriteStats()
h.stLoadInfos[writeLeader] = summaryStoresLoad(
stores,
Expand All @@ -228,9 +225,9 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
// and clean the region from regionInfluence if they have ended operator.
// It makes each key/byte rate or count become `weight` times to the origin value.
func (h *hotScheduler) summaryPendingInfluence() {
maxZombieDur := h.conf.GetMaxZombieDuration()
ret := make(map[uint64]*Influence)
for id, p := range h.regionPendings {
maxZombieDur := p.maxZombieDuration
weight, needGC := h.calcPendingInfluence(p.op, maxZombieDur)
if needGC {
delete(h.regionPendings, id)
Expand Down Expand Up @@ -266,8 +263,9 @@ func summaryStoresLoad(
) map[uint64]*storeLoadDetail {
// loadDetail stores the storeID -> hotPeers stat and its current and future stat(key/byte rate,count)
loadDetail := make(map[uint64]*storeLoadDetail, len(storesLoads))
allLoadSum := make([]float64, statistics.DimLen)
allCount := 0.0
allTiKVLoadSum := make([]float64, statistics.DimLen)
allTiKVCount := 0
allTiKVHotPeersCount := 0

// Stores without byte rate statistics is not available to schedule.
for _, store := range stores {
Expand All @@ -279,6 +277,8 @@ func summaryStoresLoad(
if kind == core.LeaderKind && !store.AllowLeaderTransfer() {
continue
}
isTiFlash := core.IsTiFlashStore(store.GetMeta())

loads := make([]float64, statistics.DimLen)
switch rwTy {
case read:
Expand Down Expand Up @@ -317,10 +317,14 @@ func summaryStoresLoad(
hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(peerLoadSum[statistics.KeyDim])
}
}
for i := range allLoadSum {
allLoadSum[i] += loads[i]

if !isTiFlash {
for i := range allTiKVLoadSum {
allTiKVLoadSum[i] += loads[i]
}
allTiKVCount += 1
allTiKVHotPeersCount += len(hotPeers)
}
allCount += float64(len(hotPeers))

// Build store load prediction from current load and pending influence.
stLoadPred := (&storeLoad{
Expand All @@ -335,16 +339,19 @@ func summaryStoresLoad(
HotPeers: hotPeers,
}
}
storeLen := float64(len(storesLoads))

expectLoads := make([]float64, len(allTiKVLoadSum))
for i := range expectLoads {
expectLoads[i] = allTiKVLoadSum[i] / float64(allTiKVCount)
}
expect := storeLoad{
Loads: expectLoads,
Count: float64(allTiKVHotPeersCount) / float64(allTiKVCount),
}

// store expectation byte/key rate and count for each store-load detail.
for id, detail := range loadDetail {
expectLoads := make([]float64, len(allLoadSum))
for i := range expectLoads {
expectLoads[i] = allLoadSum[i] / storeLen
}
expectCount := allCount / storeLen
detail.LoadPred.Expect.Loads = expectLoads
detail.LoadPred.Expect.Count = expectCount
detail.LoadPred.Expect = expect
// Debug
{
ty := "exp-byte-rate-" + rwTy.String() + "-" + kind.String()
Expand All @@ -356,7 +363,7 @@ func summaryStoresLoad(
}
{
ty := "exp-count-rate-" + rwTy.String() + "-" + kind.String()
hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(expectCount)
hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(expect.Count)
}
}
return loadDetail
Expand All @@ -377,15 +384,15 @@ func filterHotPeers(
return ret
}

func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence) bool {
func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence, maxZombieDur time.Duration) bool {
regionID := op.RegionID()
_, ok := h.regionPendings[regionID]
if ok {
schedulerStatus.WithLabelValues(h.GetName(), "pending_op_fails").Inc()
return false
}

influence := newPendingInfluence(op, srcStore, dstStore, infl)
influence := newPendingInfluence(op, srcStore, dstStore, infl, maxZombieDur)
h.regionPendings[regionID] = influence

schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Inc()
Expand Down Expand Up @@ -535,9 +542,9 @@ func (bs *balanceSolver) solve() []*operator.Operator {
}
bs.cur = &solution{}
var (
best *solution
ops []*operator.Operator
infls []Influence
best *solution
op *operator.Operator
infl Influence
)

for srcStoreID := range bs.filterSrcStores() {
Expand All @@ -553,9 +560,9 @@ func (bs *balanceSolver) solve() []*operator.Operator {
bs.cur.dstStoreID = dstStoreID
bs.calcProgressiveRank()
if bs.cur.progressiveRank < 0 && bs.betterThan(best) {
if newOps, newInfls := bs.buildOperators(); len(newOps) > 0 {
ops = newOps
infls = newInfls
if newOp, newInfl := bs.buildOperator(); newOp != nil {
op = newOp
infl = *newInfl
clone := *bs.cur
best = &clone
}
Expand All @@ -564,13 +571,25 @@ func (bs *balanceSolver) solve() []*operator.Operator {
}
}

for i := 0; i < len(ops); i++ {
// TODO: multiple operators need to be atomic.
if !bs.sche.addPendingInfluence(ops[i], best.srcStoreID, best.dstStoreID, infls[i]) {
return nil
}
if best == nil {
return nil
}

// Depending on the source of the statistics used, a different ZombieDuration will be used.
// If the statistics are from the sum of Regions, there will be a longer ZombieDuration.
var maxZombieDur time.Duration
switch {
case bs.rwTy == write && bs.opTy == transferLeader:
maxZombieDur = bs.sche.conf.GetRegionsStatZombieDuration()
default:
maxZombieDur = bs.sche.conf.GetStoreStatZombieDuration()
}

if !bs.sche.addPendingInfluence(op, best.srcStoreID, best.dstStoreID, infl, maxZombieDur) {
return nil
}
return ops

return []*operator.Operator{op}
}

// filterSrcStores compare the min rate and the ratio * expectation rate, if both key and byte rate is greater than
Expand Down Expand Up @@ -787,7 +806,7 @@ func (bs *balanceSolver) calcProgressiveRank() {
rank := int64(0)
if bs.rwTy == write && bs.opTy == transferLeader {
// In this condition, CPU usage is the matter.
// Only consider about key rate.
// Only consider key rate.
srcKeyRate := srcLd.Loads[statistics.KeyDim]
dstKeyRate := dstLd.Loads[statistics.KeyDim]
peerKeyRate := peer.GetLoad(getRegionStatKind(bs.rwTy, statistics.KeyDim))
Expand Down Expand Up @@ -998,12 +1017,11 @@ func (bs *balanceSolver) isReadyToBuild() bool {
return true
}

func (bs *balanceSolver) buildOperators() ([]*operator.Operator, []Influence) {
func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *Influence) {
if !bs.isReadyToBuild() {
return nil, nil
}
var (
op *operator.Operator
counters []prometheus.Counter
err error
)
Expand Down Expand Up @@ -1064,11 +1082,11 @@ func (bs *balanceSolver) buildOperators() ([]*operator.Operator, []Influence) {
schedulerCounter.WithLabelValues(bs.sche.GetName(), "new-operator"),
schedulerCounter.WithLabelValues(bs.sche.GetName(), bs.opTy.String()))

infl := Influence{
infl = &Influence{
Loads: append(bs.cur.srcPeerStat.Loads[:0:0], bs.cur.srcPeerStat.Loads...),
Count: 1,
}
return []*operator.Operator{op}, []Influence{infl}
return op, infl
}

func (h *hotScheduler) GetHotStatus(typ string) *statistics.StoreHotPeersInfos {
Expand Down
11 changes: 8 additions & 3 deletions server/schedulers/hot_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ func initHotRegionScheduleConfig() *hotRegionSchedulerConfig {
MinHotByteRate: 100,
MinHotKeyRate: 10,
MaxZombieRounds: 3,
MaxPeerNum: 1000,
ByteRateRankStepRatio: 0.05,
KeyRateRankStepRatio: 0.05,
CountRankStepRatio: 0.01,
GreatDecRatio: 0.95,
MinorDecRatio: 0.99,
MaxPeerNum: 1000,
SrcToleranceRatio: 1.05, // Tolerate 5% difference
DstToleranceRatio: 1.05, // Tolerate 5% difference
}
Expand Down Expand Up @@ -73,12 +73,18 @@ func (conf *hotRegionSchedulerConfig) EncodeConfig() ([]byte, error) {
return schedule.EncodeConfig(conf)
}

func (conf *hotRegionSchedulerConfig) GetMaxZombieDuration() time.Duration {
func (conf *hotRegionSchedulerConfig) GetStoreStatZombieDuration() time.Duration {
conf.RLock()
defer conf.RUnlock()
return time.Duration(conf.MaxZombieRounds) * statistics.StoreHeartBeatReportInterval * time.Second
}

func (conf *hotRegionSchedulerConfig) GetRegionsStatZombieDuration() time.Duration {
conf.RLock()
defer conf.RUnlock()
return time.Duration(conf.MaxZombieRounds) * statistics.RegionHeartBeatReportInterval * time.Second
}

func (conf *hotRegionSchedulerConfig) GetMaxPeerNumber() int {
conf.RLock()
defer conf.RUnlock()
Expand Down Expand Up @@ -214,5 +220,4 @@ func (conf *hotRegionSchedulerConfig) persist() error {

}
return conf.storage.SaveScheduleConfig(HotRegionName, data)

}
4 changes: 2 additions & 2 deletions server/schedulers/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) {
op.Start()
operator.SetOperatorStatusReachTime(op, operator.CREATED, time.Now().Add(-5*statistics.StoreHeartBeatReportInterval*time.Second))
operator.SetOperatorStatusReachTime(op, operator.STARTED, time.Now().Add((-5*statistics.StoreHeartBeatReportInterval+1)*time.Second))
return newPendingInfluence(op, 2, 4, Influence{})
return newPendingInfluence(op, 2, 4, Influence{}, hb.conf.GetStoreStatZombieDuration())
}
justDoneOpInfluence := func(region *core.RegionInfo, ty opType) *pendingInfluence {
infl := notDoneOpInfluence(region, ty)
Expand Down Expand Up @@ -1306,7 +1306,7 @@ func (s *testHotCacheSuite) TestCheckRegionFlow(c *C) {

if testcase.DegreeAfterTransferLeader >= 3 {
// try schedule
hb.prepareForBalance(tc)
hb.prepareForBalance(testcase.kind, tc)
leaderSolver := newBalanceSolver(hb, tc, testcase.kind, transferLeader)
leaderSolver.cur = &solution{srcStoreID: 2}
c.Check(leaderSolver.filterHotPeers(), HasLen, 0) // skip schedule
Expand Down
19 changes: 11 additions & 8 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"math"
"net/url"
"strconv"
"time"

"github.com/montanaflynn/stats"
"github.com/pingcap/log"
Expand Down Expand Up @@ -215,17 +216,19 @@ func (lhs *Influence) add(rhs *Influence, w float64) *Influence {

// TODO: merge it into OperatorInfluence.
type pendingInfluence struct {
op *operator.Operator
from, to uint64
origin Influence
op *operator.Operator
from, to uint64
origin Influence
maxZombieDuration time.Duration
}

func newPendingInfluence(op *operator.Operator, from, to uint64, infl Influence) *pendingInfluence {
func newPendingInfluence(op *operator.Operator, from, to uint64, infl Influence, maxZombieDur time.Duration) *pendingInfluence {
return &pendingInfluence{
op: op,
from: from,
to: to,
origin: infl,
op: op,
from: from,
to: to,
origin: infl,
maxZombieDuration: maxZombieDur,
}
}

Expand Down