Skip to content

Commit

Permalink
add tiflash hot write scheduler
Browse files Browse the repository at this point in the history
Signed-off-by: HunDunDM <hundundm@gmail.com>
  • Loading branch information
HunDunDM committed Jul 16, 2021
1 parent 1e84047 commit 2bc3c1a
Showing 1 changed file with 34 additions and 10 deletions.
44 changes: 34 additions & 10 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,14 +558,27 @@ func (bs *balanceSolver) solve() []*operator.Operator {
// its expectation * ratio, the store would be selected as hot source store
func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail {
ret := make(map[uint64]*storeLoadDetail)
confSrcToleranceRatio := bs.sche.conf.GetSrcToleranceRatio()
confEnableForTiFlash := bs.sche.conf.GetEnableForTiFlash()
for id, detail := range bs.stLoadDetail {
srcToleranceRatio := confSrcToleranceRatio
if detail.IsTiFlash {
if !confEnableForTiFlash {
continue
}
if bs.rwTy != write || bs.opTy != movePeer {
continue
}
srcToleranceRatio = 0
}
if len(detail.HotPeers) == 0 {
continue
}

minLoad := detail.LoadPred.min()
if slice.AllOf(minLoad.Loads, func(i int) bool {
if statistics.IsSelectedDim(i) {
return minLoad.Loads[i] > bs.sche.conf.GetSrcToleranceRatio()*detail.LoadPred.Expect.Loads[i]
return minLoad.Loads[i] > srcToleranceRatio*detail.LoadPred.Expect.Loads[i]
}
return true
}) {
Expand Down Expand Up @@ -705,7 +718,7 @@ func (bs *balanceSolver) getRegion() *core.RegionInfo {
func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
var (
filters []filter.Filter
candidates []*core.StoreInfo
candidates []*storeLoadDetail
)
srcStore := bs.stLoadDetail[bs.cur.srcStoreID].Store
switch bs.opTy {
Expand All @@ -718,7 +731,7 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
}

for _, detail := range bs.stLoadDetail {
candidates = append(candidates, detail.Store)
candidates = append(candidates, detail)
}

case transferLeader:
Expand All @@ -730,9 +743,9 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
filters = append(filters, leaderFilter)
}

for _, store := range bs.cluster.GetFollowerStores(bs.cur.region) {
if _, ok := bs.stLoadDetail[store.GetID()]; ok {
candidates = append(candidates, store)
for _, peer := range bs.cur.region.GetFollowers() {
if detail, ok := bs.stLoadDetail[peer.GetStoreId()]; ok {
candidates = append(candidates, detail)
}
}

Expand All @@ -742,12 +755,23 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
return bs.pickDstStores(filters, candidates)
}

func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*core.StoreInfo) map[uint64]*storeLoadDetail {
func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*storeLoadDetail) map[uint64]*storeLoadDetail {
ret := make(map[uint64]*storeLoadDetail, len(candidates))
dstToleranceRatio := bs.sche.conf.GetDstToleranceRatio()
for _, store := range candidates {
confDstToleranceRatio := bs.sche.conf.GetDstToleranceRatio()
confEnableForTiFlash := bs.sche.conf.GetEnableForTiFlash()
for _, detail := range candidates {
store := detail.Store
dstToleranceRatio := confDstToleranceRatio
if detail.IsTiFlash {
if !confEnableForTiFlash {
continue
}
if bs.rwTy != write || bs.opTy != movePeer {
continue
}
dstToleranceRatio = 0
}
if filter.Target(bs.cluster.GetOpts(), store, filters) {
detail := bs.stLoadDetail[store.GetID()]
maxLoads := detail.LoadPred.max().Loads
if slice.AllOf(maxLoads, func(i int) bool {
if statistics.IsSelectedDim(i) {
Expand Down

0 comments on commit 2bc3c1a

Please sign in to comment.