Skip to content

Commit

Permalink
scheduler: move hot region should be splitted if it's size is too lar…
Browse files Browse the repository at this point in the history
…ge . (#6281)

* add createSplitOperator

Signed-off-by: bufferflies <1045931706@qq.com>

* consider string nil

Signed-off-by: bufferflies <1045931706@qq.com>

* add unit test for split buckets

Signed-off-by: bufferflies <1045931706@qq.com>

* enable split in hot region

Signed-off-by: bufferflies <1045931706@qq.com>

* lint

Signed-off-by: bufferflies <1045931706@qq.com>

* add stringer for resourceTy

Signed-off-by: bufferflies <1045931706@qq.com>

* remove region keys and hot split keys in additional info

Signed-off-by: bufferflies <1045931706@qq.com>

* pass unit test

Signed-off-by: bufferflies <1045931706@qq.com>

* address comments

Signed-off-by: bufferflies <1045931706@qq.com>

* remove one case from TestLoadKeyspaceGroupsAssignment

Signed-off-by: bufferflies <1045931706@qq.com>

* revert TestLoadKeyspaceGroupsAssignment

Signed-off-by: bufferflies <1045931706@qq.com>

---------

Signed-off-by: bufferflies <1045931706@qq.com>
Co-authored-by: Ryan Leung <rleungx@gmail.com>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Apr 23, 2023
1 parent 5257108 commit 1b75108
Show file tree
Hide file tree
Showing 13 changed files with 336 additions and 57 deletions.
4 changes: 2 additions & 2 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ func (mc *Cluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat {
}

// BucketsStats returns hot region's buckets stats.
func (mc *Cluster) BucketsStats(degree int) map[uint64][]*buckets.BucketStat {
task := buckets.NewCollectBucketStatsTask(degree)
func (mc *Cluster) BucketsStats(degree int, regions ...uint64) map[uint64][]*buckets.BucketStat {
task := buckets.NewCollectBucketStatsTask(degree, regions...)
if !mc.HotBucketCache.CheckAsync(task) {
return nil
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ func (o *Operator) String() string {
return s
}

// Brief returns the operator's short brief.
func (o *Operator) Brief() string {
return o.brief
}

// MarshalJSON serializes custom types to JSON.
func (o *Operator) MarshalJSON() ([]byte, error) {
return []byte(`"` + o.String() + `"`), nil
Expand Down
130 changes: 114 additions & 16 deletions pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package schedulers

import (
"bytes"
"fmt"

"math"
"math/rand"
"net/http"
Expand All @@ -24,6 +26,7 @@ import (
"time"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/pkg/core"
Expand All @@ -35,24 +38,30 @@ import (
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/utils/keyutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
)

var (
statisticsInterval = time.Second
// WithLabelValues is a heavy operation, define variable to avoid call it every time.
hotSchedulerCounter = schedulerCounter.WithLabelValues(HotRegionName, "schedule")
hotSchedulerSkipCounter = schedulerCounter.WithLabelValues(HotRegionName, "skip")
hotSchedulerCounter = schedulerCounter.WithLabelValues(HotRegionName, "schedule")
hotSchedulerSkipCounter = schedulerCounter.WithLabelValues(HotRegionName, "skip")
hotSchedulerSearchRevertRegionsCounter = schedulerCounter.WithLabelValues(HotRegionName, "search_revert_regions")
hotSchedulerNotSameEngineCounter = schedulerCounter.WithLabelValues(HotRegionName, "not_same_engine")
hotSchedulerNoRegionCounter = schedulerCounter.WithLabelValues(HotRegionName, "no_region")
hotSchedulerUnhealthyReplicaCounter = schedulerCounter.WithLabelValues(HotRegionName, "unhealthy_replica")
hotSchedulerAbnormalReplicaCounter = schedulerCounter.WithLabelValues(HotRegionName, "abnormal_replica")
hotSchedulerCreateOperatorFailedCounter = schedulerCounter.WithLabelValues(HotRegionName, "create_operator_failed")
hotSchedulerNewOperatorCounter = schedulerCounter.WithLabelValues(HotRegionName, "new_operator")
hotSchedulerSnapshotSenderLimitCounter = schedulerCounter.WithLabelValues(HotRegionName, "snapshot_sender_limit")

// counter related with the split region
hotSchedulerNotFoundSplitKeysCounter = schedulerCounter.WithLabelValues(HotRegionName, "not_found_split_keys")
hotSchedulerRegionBucketsNotHotCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_buckets_not_hot")
hotSchedulerSplitSuccessCounter = schedulerCounter.WithLabelValues(HotRegionName, "split_success")
hotSchedulerNeedSplitBeforeScheduleCounter = schedulerCounter.WithLabelValues(HotRegionName, "need_split_before_move_peer")
hotSchedulerSearchRevertRegionsCounter = schedulerCounter.WithLabelValues(HotRegionName, "search_revert_regions")
hotSchedulerNotSameEngineCounter = schedulerCounter.WithLabelValues(HotRegionName, "not_same_engine")
hotSchedulerNoRegionCounter = schedulerCounter.WithLabelValues(HotRegionName, "no_region")
hotSchedulerUnhealthyReplicaCounter = schedulerCounter.WithLabelValues(HotRegionName, "unhealthy_replica")
hotSchedulerAbnormalReplicaCounter = schedulerCounter.WithLabelValues(HotRegionName, "abnormal_replica")
hotSchedulerCreateOperatorFailedCounter = schedulerCounter.WithLabelValues(HotRegionName, "create_operator_failed")
hotSchedulerNewOperatorCounter = schedulerCounter.WithLabelValues(HotRegionName, "new_operator")
hotSchedulerSnapshotSenderLimit = schedulerCounter.WithLabelValues(HotRegionName, "snapshot_sender_limit")

hotSchedulerMoveLeaderCounter = schedulerCounter.WithLabelValues(HotRegionName, moveLeader.String())
hotSchedulerMovePeerCounter = schedulerCounter.WithLabelValues(HotRegionName, movePeer.String())
Expand Down Expand Up @@ -196,6 +205,8 @@ var (
// If the distribution of a dimension is below the corresponding stddev threshold, then scheduling will no longer be based on this dimension,
// as it implies that this dimension is sufficiently uniform.
stddevThreshold = 0.1

splitBucket = "split-hot-region"
)

type hotScheduler struct {
Expand Down Expand Up @@ -634,11 +645,8 @@ func (bs *balanceSolver) solve() []*operator.Operator {
if bs.cur.region = bs.getRegion(mainPeerStat, srcStoreID); bs.cur.region == nil {
continue
} else if bs.opTy == movePeer {
if bs.cur.region.GetApproximateSize() > bs.GetOpts().GetMaxMovableHotPeerSize() {
hotSchedulerNeedSplitBeforeScheduleCounter.Inc()
continue
} else if !snapshotFilter.Select(bs.cur.region).IsOK() {
hotSchedulerSnapshotSenderLimit.Inc()
if !snapshotFilter.Select(bs.cur.region).IsOK() {
hotSchedulerSnapshotSenderLimitCounter.Inc()
continue
}
}
Expand Down Expand Up @@ -717,7 +725,7 @@ func (bs *balanceSolver) tryAddPendingInfluence() bool {
return false
}
// revert peers
if bs.best.revertPeerStat != nil {
if bs.best.revertPeerStat != nil && len(bs.ops) > 1 {
infl := bs.collectPendingInfluence(bs.best.revertPeerStat)
if !bs.sche.tryAddPendingInfluence(bs.ops[1], dstStoreID, srcStoreID, infl, maxZombieDur) {
return false
Expand Down Expand Up @@ -1341,6 +1349,22 @@ func (bs *balanceSolver) buildOperators() (ops []*operator.Operator) {
return nil
}

splitRegions := make([]*core.RegionInfo, 0)
if bs.opTy == movePeer {
for _, region := range []*core.RegionInfo{bs.cur.region, bs.cur.revertRegion} {
if region == nil {
continue
}
if region.GetApproximateSize() > bs.GetOpts().GetMaxMovableHotPeerSize() {
hotSchedulerNeedSplitBeforeScheduleCounter.Inc()
splitRegions = append(splitRegions, region)
}
}
}
if len(splitRegions) > 0 {
return bs.createSplitOperator(splitRegions)
}

srcStoreID := bs.cur.srcStore.GetID()
dstStoreID := bs.cur.dstStore.GetID()
sourceLabel := strconv.FormatUint(srcStoreID, 10)
Expand Down Expand Up @@ -1377,6 +1401,64 @@ func (bs *balanceSolver) buildOperators() (ops []*operator.Operator) {
return
}

// createSplitOperator creates split operators for the given regions.
func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo) []*operator.Operator {
if len(regions) == 0 {
return nil
}
ids := make([]uint64, len(regions))
for i, region := range regions {
ids[i] = region.GetID()
}
hotBuckets := bs.Cluster.BucketsStats(bs.minHotDegree, ids...)
operators := make([]*operator.Operator, 0)

createFunc := func(region *core.RegionInfo) {
stats, ok := hotBuckets[region.GetID()]
if !ok {
hotSchedulerRegionBucketsNotHotCounter.Inc()
return
}
startKey, endKey := region.GetStartKey(), region.GetEndKey()
splitKey := make([][]byte, 0)
for _, stat := range stats {
if keyutil.Between(startKey, endKey, stat.StartKey) && keyutil.Between(startKey, endKey, stat.EndKey) {
if len(splitKey) == 0 {
splitKey = append(splitKey, stat.StartKey, stat.EndKey)
continue
}
// If the last split key is equal to the current start key, we can merge them.
// E.g. [a, b), [b, c) -> [a, c) split keys is [a,c]
// Otherwise, we should append the current start key and end key.
// E.g. [a, b), [c, d) -> [a, b), [c, d) split keys is [a,b,c,d]
if bytes.Equal(stat.StartKey, splitKey[len(splitKey)-1]) {
splitKey[len(splitKey)-1] = stat.EndKey
} else {
splitKey = append(splitKey, stat.StartKey, stat.EndKey)
}
}
}
if len(splitKey) == 0 {
hotSchedulerNotFoundSplitKeysCounter.Inc()
return
}
op, err := operator.CreateSplitRegionOperator(splitBucket, region, operator.OpSplit, pdpb.CheckPolicy_USEKEY, splitKey)
if err != nil {
log.Error("fail to create split operator",
zap.Stringer("resource-type", bs.resourceTy),
errs.ZapError(err))
return
}
hotSchedulerSplitSuccessCounter.Inc()
operators = append(operators, op)
}

for _, region := range regions {
createFunc(region)
}
return operators
}

func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error) {
if region.GetStorePeer(dstStoreID) != nil {
typ = "transfer-leader"
Expand Down Expand Up @@ -1550,6 +1632,22 @@ const (
resourceTypeLen
)

// String implements fmt.Stringer interface.
func (ty resourceType) String() string {
switch ty {
case writePeer:
return "write-peer"
case writeLeader:
return "write-leader"
case readPeer:
return "read-peer"
case readLeader:
return "read-leader"
default:
return ""
}
}

func toResourceType(rwTy statistics.RWType, opTy opType) resourceType {
switch rwTy {
case statistics.Write:
Expand Down
43 changes: 43 additions & 0 deletions pkg/schedule/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import (

"github.com/docker/go-units"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/statistics/buckets"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/operatorutil"
Expand Down Expand Up @@ -200,6 +202,47 @@ func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) {
checkHotWriteRegionScheduleByteRateOnly(re, true /* enable placement rules */)
}

func TestSplitBuckets(t *testing.T) {
re := require.New(t)
statistics.Denoising = false
cancel, _, tc, oc := prepareSchedulersTest()
tc.SetHotRegionCacheHitsThreshold(1)
defer cancel()
hb, err := schedule.CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
solve := newBalanceSolver(hb.(*hotScheduler), tc, statistics.Read, transferLeader)
region := core.NewTestRegionInfo(1, 1, []byte(""), []byte(""))

// the hot range is [a,c],[e,f]
b := &metapb.Buckets{
RegionId: 1,
PeriodInMs: 1000,
Keys: [][]byte{[]byte("a"), []byte("b"), []byte("c"), []byte("d"), []byte("e"), []byte("f")},
Stats: &metapb.BucketStats{
ReadBytes: []uint64{10 * units.KiB, 10 * units.KiB, 0, 10 * units.KiB, 10 * units.KiB},
ReadKeys: []uint64{256, 256, 0, 256, 256},
ReadQps: []uint64{0, 0, 0, 0, 0},
WriteBytes: []uint64{0, 0, 0, 0, 0},
WriteQps: []uint64{0, 0, 0, 0, 0},
WriteKeys: []uint64{0, 0, 0, 0, 0},
},
}

task := buckets.NewCheckPeerTask(b)
re.True(tc.HotBucketCache.CheckAsync(task))
time.Sleep(time.Millisecond * 10)
ops := solve.createSplitOperator([]*core.RegionInfo{region})
re.Equal(1, len(ops))
op := ops[0]
re.Equal(splitBucket, op.Desc())
expectKeys := [][]byte{[]byte("a"), []byte("c"), []byte("d"), []byte("f")}
expectOp, err := operator.CreateSplitRegionOperator(splitBucket, region, operator.OpSplit, pdpb.CheckPolicy_USEKEY, expectKeys)
re.NoError(err)
expectOp.GetCreateTime()
re.Equal(expectOp.Brief(), op.Brief())
re.Equal(expectOp.GetAdditionalInfo(), op.GetAdditionalInfo())
}

func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlacementRules bool) {
cancel, opt, tc, oc := prepareSchedulersTest()
defer cancel()
Expand Down
8 changes: 5 additions & 3 deletions pkg/statistics/buckets/bucket_stat_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func init() {

// BucketStatInformer is used to get the bucket statistics.
type BucketStatInformer interface {
BucketsStats(degree int) map[uint64][]*BucketStat
BucketsStats(degree int, regions ...uint64) map[uint64][]*BucketStat
}

// BucketStat is the record the bucket statistics.
Expand Down Expand Up @@ -190,11 +190,13 @@ func (b *BucketTreeItem) calculateHotDegree() {
// TODO: qps should be considered, tikv will report this in next sprint
// the order: read [bytes keys qps] and write[bytes keys qps]
readLoads := stat.Loads[:2]
readHot := slice.AllOf(readLoads, func(i int) bool {
// keep same with the hot region hot degree
// https://github.com/tikv/pd/blob/6f6f545a6716840f7e2c7f4d8ed9b49f613a5cd8/pkg/statistics/hot_peer_cache.go#L220-L222
readHot := slice.AnyOf(readLoads, func(i int) bool {
return readLoads[i] > minHotThresholds[i]
})
writeLoads := stat.Loads[3:5]
writeHot := slice.AllOf(writeLoads, func(i int) bool {
writeHot := slice.AnyOf(writeLoads, func(i int) bool {
return writeLoads[i] > minHotThresholds[3+i]
})
hot := readHot || writeHot
Expand Down
16 changes: 14 additions & 2 deletions pkg/statistics/buckets/hot_bucket_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ type HotBucketCache struct {
}

// GetHotBucketStats returns the hot stats of the regions that great than degree.
func (h *HotBucketCache) GetHotBucketStats(degree int) map[uint64][]*BucketStat {
func (h *HotBucketCache) GetHotBucketStats(degree int, regions []uint64) map[uint64][]*BucketStat {
rst := make(map[uint64][]*BucketStat)
for _, item := range h.bucketsOfRegion {
appendItems := func(item *BucketTreeItem) {
stats := make([]*BucketStat, 0)
for _, b := range item.stats {
if b.HotDegree >= degree {
Expand All @@ -67,6 +67,18 @@ func (h *HotBucketCache) GetHotBucketStats(degree int) map[uint64][]*BucketStat
rst[item.regionID] = stats
}
}
if len(regions) == 0 {
for _, item := range h.bucketsOfRegion {
appendItems(item)
}
} else {
for _, region := range regions {
if item, ok := h.bucketsOfRegion[region]; ok {
appendItems(item)
}
}
}

return rst
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/statistics/buckets/hot_bucket_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,15 @@ func (t *checkBucketsTask) runTask(cache *HotBucketCache) {

type collectBucketStatsTask struct {
minDegree int
regions []uint64
ret chan map[uint64][]*BucketStat // RegionID ==>Buckets
}

// NewCollectBucketStatsTask creates task to collect bucket stats.
func NewCollectBucketStatsTask(minDegree int) *collectBucketStatsTask {
func NewCollectBucketStatsTask(minDegree int, regions ...uint64) *collectBucketStatsTask {
return &collectBucketStatsTask{
minDegree: minDegree,
regions: regions,
ret: make(chan map[uint64][]*BucketStat, 1),
}
}
Expand All @@ -82,7 +84,7 @@ func (t *collectBucketStatsTask) taskType() flowItemTaskKind {
}

func (t *collectBucketStatsTask) runTask(cache *HotBucketCache) {
t.ret <- cache.GetHotBucketStats(t.minDegree)
t.ret <- cache.GetHotBucketStats(t.minDegree, t.regions)
}

// WaitRet returns the result of the task.
Expand Down
Loading

0 comments on commit 1b75108

Please sign in to comment.