diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 6ff059d5fae..f1d9e42baf6 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -137,8 +137,8 @@ func (mc *Cluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat { } // BucketsStats returns hot region's buckets stats. -func (mc *Cluster) BucketsStats(degree int) map[uint64][]*buckets.BucketStat { - task := buckets.NewCollectBucketStatsTask(degree) +func (mc *Cluster) BucketsStats(degree int, regions ...uint64) map[uint64][]*buckets.BucketStat { + task := buckets.NewCollectBucketStatsTask(degree, regions...) if !mc.HotBucketCache.CheckAsync(task) { return nil } diff --git a/pkg/schedule/operator/operator.go b/pkg/schedule/operator/operator.go index c69e266308e..4a9f9e8cbbd 100644 --- a/pkg/schedule/operator/operator.go +++ b/pkg/schedule/operator/operator.go @@ -103,6 +103,11 @@ func (o *Operator) String() string { return s } +// Brief returns the operator's short brief. +func (o *Operator) Brief() string { + return o.brief +} + // MarshalJSON serializes custom types to JSON. func (o *Operator) MarshalJSON() ([]byte, error) { return []byte(`"` + o.String() + `"`), nil diff --git a/pkg/schedule/schedulers/hot_region.go b/pkg/schedule/schedulers/hot_region.go index ee50080f1a5..00c5e8981a9 100644 --- a/pkg/schedule/schedulers/hot_region.go +++ b/pkg/schedule/schedulers/hot_region.go @@ -15,7 +15,9 @@ package schedulers import ( + "bytes" "fmt" + "math" "math/rand" "net/http" @@ -24,6 +26,7 @@ import ( "time" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/pkg/core" @@ -35,6 +38,7 @@ import ( "github.com/tikv/pd/pkg/schedule/plan" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/statistics" + "github.com/tikv/pd/pkg/utils/keyutil" "github.com/tikv/pd/pkg/utils/syncutil" "go.uber.org/zap" ) @@ -42,17 +46,22 @@ import ( var ( statisticsInterval = time.Second // WithLabelValues is a heavy operation, define variable to avoid call it every time. - hotSchedulerCounter = schedulerCounter.WithLabelValues(HotRegionName, "schedule") - hotSchedulerSkipCounter = schedulerCounter.WithLabelValues(HotRegionName, "skip") + hotSchedulerCounter = schedulerCounter.WithLabelValues(HotRegionName, "schedule") + hotSchedulerSkipCounter = schedulerCounter.WithLabelValues(HotRegionName, "skip") + hotSchedulerSearchRevertRegionsCounter = schedulerCounter.WithLabelValues(HotRegionName, "search_revert_regions") + hotSchedulerNotSameEngineCounter = schedulerCounter.WithLabelValues(HotRegionName, "not_same_engine") + hotSchedulerNoRegionCounter = schedulerCounter.WithLabelValues(HotRegionName, "no_region") + hotSchedulerUnhealthyReplicaCounter = schedulerCounter.WithLabelValues(HotRegionName, "unhealthy_replica") + hotSchedulerAbnormalReplicaCounter = schedulerCounter.WithLabelValues(HotRegionName, "abnormal_replica") + hotSchedulerCreateOperatorFailedCounter = schedulerCounter.WithLabelValues(HotRegionName, "create_operator_failed") + hotSchedulerNewOperatorCounter = schedulerCounter.WithLabelValues(HotRegionName, "new_operator") + hotSchedulerSnapshotSenderLimitCounter = schedulerCounter.WithLabelValues(HotRegionName, "snapshot_sender_limit") + + // counter related with the split region + hotSchedulerNotFoundSplitKeysCounter = schedulerCounter.WithLabelValues(HotRegionName, "not_found_split_keys") + hotSchedulerRegionBucketsNotHotCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_buckets_not_hot") + hotSchedulerSplitSuccessCounter = schedulerCounter.WithLabelValues(HotRegionName, "split_success") hotSchedulerNeedSplitBeforeScheduleCounter = schedulerCounter.WithLabelValues(HotRegionName, "need_split_before_move_peer") - hotSchedulerSearchRevertRegionsCounter = schedulerCounter.WithLabelValues(HotRegionName, "search_revert_regions") - hotSchedulerNotSameEngineCounter = schedulerCounter.WithLabelValues(HotRegionName, "not_same_engine") - hotSchedulerNoRegionCounter = schedulerCounter.WithLabelValues(HotRegionName, "no_region") - hotSchedulerUnhealthyReplicaCounter = schedulerCounter.WithLabelValues(HotRegionName, "unhealthy_replica") - hotSchedulerAbnormalReplicaCounter = schedulerCounter.WithLabelValues(HotRegionName, "abnormal_replica") - hotSchedulerCreateOperatorFailedCounter = schedulerCounter.WithLabelValues(HotRegionName, "create_operator_failed") - hotSchedulerNewOperatorCounter = schedulerCounter.WithLabelValues(HotRegionName, "new_operator") - hotSchedulerSnapshotSenderLimit = schedulerCounter.WithLabelValues(HotRegionName, "snapshot_sender_limit") hotSchedulerMoveLeaderCounter = schedulerCounter.WithLabelValues(HotRegionName, moveLeader.String()) hotSchedulerMovePeerCounter = schedulerCounter.WithLabelValues(HotRegionName, movePeer.String()) @@ -196,6 +205,8 @@ var ( // If the distribution of a dimension is below the corresponding stddev threshold, then scheduling will no longer be based on this dimension, // as it implies that this dimension is sufficiently uniform. stddevThreshold = 0.1 + + splitBucket = "split-hot-region" ) type hotScheduler struct { @@ -634,11 +645,8 @@ func (bs *balanceSolver) solve() []*operator.Operator { if bs.cur.region = bs.getRegion(mainPeerStat, srcStoreID); bs.cur.region == nil { continue } else if bs.opTy == movePeer { - if bs.cur.region.GetApproximateSize() > bs.GetOpts().GetMaxMovableHotPeerSize() { - hotSchedulerNeedSplitBeforeScheduleCounter.Inc() - continue - } else if !snapshotFilter.Select(bs.cur.region).IsOK() { - hotSchedulerSnapshotSenderLimit.Inc() + if !snapshotFilter.Select(bs.cur.region).IsOK() { + hotSchedulerSnapshotSenderLimitCounter.Inc() continue } } @@ -717,7 +725,7 @@ func (bs *balanceSolver) tryAddPendingInfluence() bool { return false } // revert peers - if bs.best.revertPeerStat != nil { + if bs.best.revertPeerStat != nil && len(bs.ops) > 1 { infl := bs.collectPendingInfluence(bs.best.revertPeerStat) if !bs.sche.tryAddPendingInfluence(bs.ops[1], dstStoreID, srcStoreID, infl, maxZombieDur) { return false @@ -1341,6 +1349,22 @@ func (bs *balanceSolver) buildOperators() (ops []*operator.Operator) { return nil } + splitRegions := make([]*core.RegionInfo, 0) + if bs.opTy == movePeer { + for _, region := range []*core.RegionInfo{bs.cur.region, bs.cur.revertRegion} { + if region == nil { + continue + } + if region.GetApproximateSize() > bs.GetOpts().GetMaxMovableHotPeerSize() { + hotSchedulerNeedSplitBeforeScheduleCounter.Inc() + splitRegions = append(splitRegions, region) + } + } + } + if len(splitRegions) > 0 { + return bs.createSplitOperator(splitRegions) + } + srcStoreID := bs.cur.srcStore.GetID() dstStoreID := bs.cur.dstStore.GetID() sourceLabel := strconv.FormatUint(srcStoreID, 10) @@ -1377,6 +1401,64 @@ func (bs *balanceSolver) buildOperators() (ops []*operator.Operator) { return } +// createSplitOperator creates split operators for the given regions. +func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo) []*operator.Operator { + if len(regions) == 0 { + return nil + } + ids := make([]uint64, len(regions)) + for i, region := range regions { + ids[i] = region.GetID() + } + hotBuckets := bs.Cluster.BucketsStats(bs.minHotDegree, ids...) + operators := make([]*operator.Operator, 0) + + createFunc := func(region *core.RegionInfo) { + stats, ok := hotBuckets[region.GetID()] + if !ok { + hotSchedulerRegionBucketsNotHotCounter.Inc() + return + } + startKey, endKey := region.GetStartKey(), region.GetEndKey() + splitKey := make([][]byte, 0) + for _, stat := range stats { + if keyutil.Between(startKey, endKey, stat.StartKey) && keyutil.Between(startKey, endKey, stat.EndKey) { + if len(splitKey) == 0 { + splitKey = append(splitKey, stat.StartKey, stat.EndKey) + continue + } + // If the last split key is equal to the current start key, we can merge them. + // E.g. [a, b), [b, c) -> [a, c) split keys is [a,c] + // Otherwise, we should append the current start key and end key. + // E.g. [a, b), [c, d) -> [a, b), [c, d) split keys is [a,b,c,d] + if bytes.Equal(stat.StartKey, splitKey[len(splitKey)-1]) { + splitKey[len(splitKey)-1] = stat.EndKey + } else { + splitKey = append(splitKey, stat.StartKey, stat.EndKey) + } + } + } + if len(splitKey) == 0 { + hotSchedulerNotFoundSplitKeysCounter.Inc() + return + } + op, err := operator.CreateSplitRegionOperator(splitBucket, region, operator.OpSplit, pdpb.CheckPolicy_USEKEY, splitKey) + if err != nil { + log.Error("fail to create split operator", + zap.Stringer("resource-type", bs.resourceTy), + errs.ZapError(err)) + return + } + hotSchedulerSplitSuccessCounter.Inc() + operators = append(operators, op) + } + + for _, region := range regions { + createFunc(region) + } + return operators +} + func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error) { if region.GetStorePeer(dstStoreID) != nil { typ = "transfer-leader" @@ -1550,6 +1632,22 @@ const ( resourceTypeLen ) +// String implements fmt.Stringer interface. +func (ty resourceType) String() string { + switch ty { + case writePeer: + return "write-peer" + case writeLeader: + return "write-leader" + case readPeer: + return "read-peer" + case readLeader: + return "read-leader" + default: + return "" + } +} + func toResourceType(rwTy statistics.RWType, opTy opType) resourceType { switch rwTy { case statistics.Write: diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index 71b7805f1b9..bc46fe29fe6 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -22,6 +22,7 @@ import ( "github.com/docker/go-units" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mock/mockcluster" @@ -29,6 +30,7 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/statistics" + "github.com/tikv/pd/pkg/statistics/buckets" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/operatorutil" @@ -200,6 +202,47 @@ func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) { checkHotWriteRegionScheduleByteRateOnly(re, true /* enable placement rules */) } +func TestSplitBuckets(t *testing.T) { + re := require.New(t) + statistics.Denoising = false + cancel, _, tc, oc := prepareSchedulersTest() + tc.SetHotRegionCacheHitsThreshold(1) + defer cancel() + hb, err := schedule.CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil) + re.NoError(err) + solve := newBalanceSolver(hb.(*hotScheduler), tc, statistics.Read, transferLeader) + region := core.NewTestRegionInfo(1, 1, []byte(""), []byte("")) + + // the hot range is [a,c],[e,f] + b := &metapb.Buckets{ + RegionId: 1, + PeriodInMs: 1000, + Keys: [][]byte{[]byte("a"), []byte("b"), []byte("c"), []byte("d"), []byte("e"), []byte("f")}, + Stats: &metapb.BucketStats{ + ReadBytes: []uint64{10 * units.KiB, 10 * units.KiB, 0, 10 * units.KiB, 10 * units.KiB}, + ReadKeys: []uint64{256, 256, 0, 256, 256}, + ReadQps: []uint64{0, 0, 0, 0, 0}, + WriteBytes: []uint64{0, 0, 0, 0, 0}, + WriteQps: []uint64{0, 0, 0, 0, 0}, + WriteKeys: []uint64{0, 0, 0, 0, 0}, + }, + } + + task := buckets.NewCheckPeerTask(b) + re.True(tc.HotBucketCache.CheckAsync(task)) + time.Sleep(time.Millisecond * 10) + ops := solve.createSplitOperator([]*core.RegionInfo{region}) + re.Equal(1, len(ops)) + op := ops[0] + re.Equal(splitBucket, op.Desc()) + expectKeys := [][]byte{[]byte("a"), []byte("c"), []byte("d"), []byte("f")} + expectOp, err := operator.CreateSplitRegionOperator(splitBucket, region, operator.OpSplit, pdpb.CheckPolicy_USEKEY, expectKeys) + re.NoError(err) + expectOp.GetCreateTime() + re.Equal(expectOp.Brief(), op.Brief()) + re.Equal(expectOp.GetAdditionalInfo(), op.GetAdditionalInfo()) +} + func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlacementRules bool) { cancel, opt, tc, oc := prepareSchedulersTest() defer cancel() diff --git a/pkg/statistics/buckets/bucket_stat_informer.go b/pkg/statistics/buckets/bucket_stat_informer.go index f01ad5f0978..ba917a864c7 100644 --- a/pkg/statistics/buckets/bucket_stat_informer.go +++ b/pkg/statistics/buckets/bucket_stat_informer.go @@ -35,7 +35,7 @@ func init() { // BucketStatInformer is used to get the bucket statistics. type BucketStatInformer interface { - BucketsStats(degree int) map[uint64][]*BucketStat + BucketsStats(degree int, regions ...uint64) map[uint64][]*BucketStat } // BucketStat is the record the bucket statistics. @@ -190,11 +190,13 @@ func (b *BucketTreeItem) calculateHotDegree() { // TODO: qps should be considered, tikv will report this in next sprint // the order: read [bytes keys qps] and write[bytes keys qps] readLoads := stat.Loads[:2] - readHot := slice.AllOf(readLoads, func(i int) bool { + // keep same with the hot region hot degree + // https://github.com/tikv/pd/blob/6f6f545a6716840f7e2c7f4d8ed9b49f613a5cd8/pkg/statistics/hot_peer_cache.go#L220-L222 + readHot := slice.AnyOf(readLoads, func(i int) bool { return readLoads[i] > minHotThresholds[i] }) writeLoads := stat.Loads[3:5] - writeHot := slice.AllOf(writeLoads, func(i int) bool { + writeHot := slice.AnyOf(writeLoads, func(i int) bool { return writeLoads[i] > minHotThresholds[3+i] }) hot := readHot || writeHot diff --git a/pkg/statistics/buckets/hot_bucket_cache.go b/pkg/statistics/buckets/hot_bucket_cache.go index 15148e2dd97..1be3033f314 100644 --- a/pkg/statistics/buckets/hot_bucket_cache.go +++ b/pkg/statistics/buckets/hot_bucket_cache.go @@ -54,9 +54,9 @@ type HotBucketCache struct { } // GetHotBucketStats returns the hot stats of the regions that great than degree. -func (h *HotBucketCache) GetHotBucketStats(degree int) map[uint64][]*BucketStat { +func (h *HotBucketCache) GetHotBucketStats(degree int, regions []uint64) map[uint64][]*BucketStat { rst := make(map[uint64][]*BucketStat) - for _, item := range h.bucketsOfRegion { + appendItems := func(item *BucketTreeItem) { stats := make([]*BucketStat, 0) for _, b := range item.stats { if b.HotDegree >= degree { @@ -67,6 +67,18 @@ func (h *HotBucketCache) GetHotBucketStats(degree int) map[uint64][]*BucketStat rst[item.regionID] = stats } } + if len(regions) == 0 { + for _, item := range h.bucketsOfRegion { + appendItems(item) + } + } else { + for _, region := range regions { + if item, ok := h.bucketsOfRegion[region]; ok { + appendItems(item) + } + } + } + return rst } diff --git a/pkg/statistics/buckets/hot_bucket_task.go b/pkg/statistics/buckets/hot_bucket_task.go index 4ceecdefd01..a779fbe80ed 100644 --- a/pkg/statistics/buckets/hot_bucket_task.go +++ b/pkg/statistics/buckets/hot_bucket_task.go @@ -66,13 +66,15 @@ func (t *checkBucketsTask) runTask(cache *HotBucketCache) { type collectBucketStatsTask struct { minDegree int + regions []uint64 ret chan map[uint64][]*BucketStat // RegionID ==>Buckets } // NewCollectBucketStatsTask creates task to collect bucket stats. -func NewCollectBucketStatsTask(minDegree int) *collectBucketStatsTask { +func NewCollectBucketStatsTask(minDegree int, regions ...uint64) *collectBucketStatsTask { return &collectBucketStatsTask{ minDegree: minDegree, + regions: regions, ret: make(chan map[uint64][]*BucketStat, 1), } } @@ -82,7 +84,7 @@ func (t *collectBucketStatsTask) taskType() flowItemTaskKind { } func (t *collectBucketStatsTask) runTask(cache *HotBucketCache) { - t.ret <- cache.GetHotBucketStats(t.minDegree) + t.ret <- cache.GetHotBucketStats(t.minDegree, t.regions) } // WaitRet returns the result of the task. diff --git a/pkg/utils/keyutil/util.go b/pkg/utils/keyutil/util.go index 196369bdef2..a5513d4108d 100644 --- a/pkg/utils/keyutil/util.go +++ b/pkg/utils/keyutil/util.go @@ -40,3 +40,29 @@ func MinKey(a, b []byte) []byte { } return a } + +type boundary int + +const ( + left boundary = iota + right +) + +// less returns true if a < b. +// If the key is empty and the boundary is right, the keys is infinite. +func less(a, b []byte, boundary boundary) bool { + ret := bytes.Compare(a, b) + if ret < 0 { + return true + } + if boundary == right && len(b) == 0 && len(a) > 0 { + return true + } + return false +} + +// Between returns true if startKey < key < endKey. +// If the key is empty and the boundary is right, the keys is infinite. +func Between(startKey, endKey, key []byte) bool { + return less(startKey, key, left) && less(key, endKey, right) +} diff --git a/pkg/utils/keyutil/util_test.go b/pkg/utils/keyutil/util_test.go index dc149d9c81e..374faa1f797 100644 --- a/pkg/utils/keyutil/util_test.go +++ b/pkg/utils/keyutil/util_test.go @@ -28,3 +28,104 @@ func TestKeyUtil(t *testing.T) { key := BuildKeyRangeKey(startKey, endKey) re.Equal("61-62", key) } + +func TestLess(t *testing.T) { + re := require.New(t) + TestData := []struct { + a []byte + b []byte + boundary boundary + expect bool + }{ + { + []byte("a"), + []byte("b"), + left, + true, + }, + { + []byte("a"), + []byte("b"), + right, + true, + }, + { + []byte("a"), + []byte(""), + left, + false, + }, + { + []byte("a"), + []byte(""), + right, + true, + }, + { + []byte("a"), + []byte("a"), + right, + false, + }, + { + []byte(""), + []byte(""), + right, + false, + }, + { + []byte(""), + []byte(""), + left, + false, + }, + } + for _, data := range TestData { + re.Equal(data.expect, less(data.a, data.b, data.boundary)) + } +} + +func TestBetween(t *testing.T) { + re := require.New(t) + TestData := []struct { + startKey []byte + endKey []byte + key []byte + + expect bool + }{ + { + []byte("a"), + []byte("c"), + []byte("b"), + true, + }, + { + []byte("a"), + []byte("c"), + []byte("c"), + false, + }, + { + []byte("a"), + []byte(""), + []byte("b"), + true, + }, + { + []byte("a"), + []byte(""), + []byte(""), + false, + }, + { + []byte("a"), + []byte(""), + []byte("a"), + false, + }, + } + for _, data := range TestData { + re.Equal(data.expect, Between(data.startKey, data.endKey, data.key)) + } +} diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 14c3523ba02..c2bc7a908b0 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -2286,8 +2286,8 @@ func (c *RaftCluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat { } // BucketsStats returns hot region's buckets stats. -func (c *RaftCluster) BucketsStats(degree int) map[uint64][]*buckets.BucketStat { - task := buckets.NewCollectBucketStatsTask(degree) +func (c *RaftCluster) BucketsStats(degree int, regions ...uint64) map[uint64][]*buckets.BucketStat { + task := buckets.NewCollectBucketStatsTask(degree, regions...) if !c.hotBuckets.CheckAsync(task) { return nil } diff --git a/server/cluster/coordinator_test.go b/server/cluster/coordinator_test.go index b7652447678..722fb3bfebe 100644 --- a/server/cluster/coordinator_test.go +++ b/server/cluster/coordinator_test.go @@ -713,7 +713,6 @@ func TestAddScheduler(t *testing.T) { re.NoError(co.removeScheduler(schedulers.BalanceLeaderName)) re.NoError(co.removeScheduler(schedulers.BalanceRegionName)) re.NoError(co.removeScheduler(schedulers.HotRegionName)) - re.NoError(co.removeScheduler(schedulers.SplitBucketName)) re.NoError(co.removeScheduler(schedulers.BalanceWitnessName)) re.NoError(co.removeScheduler(schedulers.TransferWitnessLeaderName)) re.Empty(co.schedulers) @@ -786,12 +785,12 @@ func TestPersistScheduler(t *testing.T) { tc, co, cleanup := prepare(nil, nil, func(co *coordinator) { co.run() }, re) hbStreams := co.hbStreams defer cleanup() - + defaultCount := len(config.DefaultSchedulers) // Add stores 1,2 re.NoError(tc.addLeaderStore(1, 1)) re.NoError(tc.addLeaderStore(2, 1)) - re.Len(co.schedulers, 6) + re.Len(co.schedulers, defaultCount) oc := co.opController storage := tc.RaftCluster.storage @@ -801,17 +800,18 @@ func TestPersistScheduler(t *testing.T) { evict, err := schedule.CreateScheduler(schedulers.EvictLeaderType, oc, storage, schedule.ConfigSliceDecoder(schedulers.EvictLeaderType, []string{"2"})) re.NoError(err) re.NoError(co.addScheduler(evict, "2")) - re.Len(co.schedulers, 8) + re.Len(co.schedulers, defaultCount+2) sches, _, err := storage.LoadAllScheduleConfig() re.NoError(err) - re.Len(sches, 8) + re.Len(sches, defaultCount+2) + + // remove 5 schedulers re.NoError(co.removeScheduler(schedulers.BalanceLeaderName)) re.NoError(co.removeScheduler(schedulers.BalanceRegionName)) re.NoError(co.removeScheduler(schedulers.HotRegionName)) - re.NoError(co.removeScheduler(schedulers.SplitBucketName)) re.NoError(co.removeScheduler(schedulers.BalanceWitnessName)) re.NoError(co.removeScheduler(schedulers.TransferWitnessLeaderName)) - re.Len(co.schedulers, 2) + re.Len(co.schedulers, defaultCount-3) re.NoError(co.cluster.opt.Persist(storage)) co.stop() co.wg.Wait() @@ -826,7 +826,7 @@ func TestPersistScheduler(t *testing.T) { defer func() { config.DefaultSchedulers = config.DefaultSchedulers[:len(config.DefaultSchedulers)-1] }() - re.Len(newOpt.GetSchedulers(), 6) + re.Len(newOpt.GetSchedulers(), defaultCount) re.NoError(newOpt.Reload(storage)) // only remains 3 items with independent config. sches, _, err = storage.LoadAllScheduleConfig() @@ -834,7 +834,7 @@ func TestPersistScheduler(t *testing.T) { re.Len(sches, 3) // option have 6 items because the default scheduler do not remove. - re.Len(newOpt.GetSchedulers(), 9) + re.Len(newOpt.GetSchedulers(), defaultCount+3) re.NoError(newOpt.Persist(storage)) tc.RaftCluster.opt = newOpt @@ -857,14 +857,14 @@ func TestPersistScheduler(t *testing.T) { brs, err := schedule.CreateScheduler(schedulers.BalanceRegionType, oc, storage, schedule.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) re.NoError(err) re.NoError(co.addScheduler(brs)) - re.Len(co.schedulers, 5) + re.Len(co.schedulers, defaultCount) // the scheduler option should contain 6 items // the `hot scheduler` are disabled - re.Len(co.cluster.opt.GetSchedulers(), 9) + re.Len(co.cluster.opt.GetSchedulers(), defaultCount+3) re.NoError(co.removeScheduler(schedulers.GrantLeaderName)) // the scheduler that is not enable by default will be completely deleted - re.Len(co.cluster.opt.GetSchedulers(), 8) + re.Len(co.cluster.opt.GetSchedulers(), defaultCount+2) re.Len(co.schedulers, 4) re.NoError(co.cluster.opt.Persist(co.cluster.storage)) co.stop() @@ -876,9 +876,9 @@ func TestPersistScheduler(t *testing.T) { co = newCoordinator(ctx, tc.RaftCluster, hbStreams) co.run() - re.Len(co.schedulers, 4) + re.Len(co.schedulers, defaultCount-1) re.NoError(co.removeScheduler(schedulers.EvictLeaderName)) - re.Len(co.schedulers, 3) + re.Len(co.schedulers, defaultCount-2) } func TestRemoveScheduler(t *testing.T) { @@ -895,25 +895,25 @@ func TestRemoveScheduler(t *testing.T) { // Add stores 1,2 re.NoError(tc.addLeaderStore(1, 1)) re.NoError(tc.addLeaderStore(2, 1)) + defaultCount := len(config.DefaultSchedulers) - re.Len(co.schedulers, 6) + re.Len(co.schedulers, defaultCount) oc := co.opController storage := tc.RaftCluster.storage gls1, err := schedule.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedule.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"})) re.NoError(err) re.NoError(co.addScheduler(gls1, "1")) - re.Len(co.schedulers, 7) + re.Len(co.schedulers, defaultCount+1) sches, _, err := storage.LoadAllScheduleConfig() re.NoError(err) - re.Len(sches, 7) + re.Len(sches, defaultCount+1) // remove all schedulers re.NoError(co.removeScheduler(schedulers.BalanceLeaderName)) re.NoError(co.removeScheduler(schedulers.BalanceRegionName)) re.NoError(co.removeScheduler(schedulers.HotRegionName)) re.NoError(co.removeScheduler(schedulers.GrantLeaderName)) - re.NoError(co.removeScheduler(schedulers.SplitBucketName)) re.NoError(co.removeScheduler(schedulers.BalanceWitnessName)) re.NoError(co.removeScheduler(schedulers.TransferWitnessLeaderName)) // all removed @@ -934,7 +934,8 @@ func TestRemoveScheduler(t *testing.T) { co.run() re.Empty(co.schedulers) // the option remains default scheduler - re.Len(co.cluster.opt.GetSchedulers(), 6) + + re.Len(co.cluster.opt.GetSchedulers(), defaultCount) co.stop() co.wg.Wait() } diff --git a/server/config/config.go b/server/config/config.go index 92042c73187..e2d8b95976f 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -969,7 +969,6 @@ var DefaultSchedulers = SchedulerConfigs{ {Type: "balance-leader"}, {Type: "balance-witness"}, {Type: "hot-region"}, - {Type: "split-bucket"}, {Type: "transfer-witness-leader"}, } diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index 12813c8b097..aced6750a11 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -481,14 +481,4 @@ func TestScheduler(t *testing.T) { err = leaderServer.GetServer().SetScheduleConfig(*cfg) re.NoError(err) checkSchedulerWithStatusCommand(nil, "disabled", nil) - - // test split bucket scheduler - echo = mustExec([]string{"-u", pdAddr, "scheduler", "config", "split-bucket-scheduler"}, nil) - re.Contains(echo, "\"degree\": 3") - echo = mustExec([]string{"-u", pdAddr, "scheduler", "config", "split-bucket-scheduler", "set", "degree", "10"}, nil) - re.Contains(echo, "Success") - echo = mustExec([]string{"-u", pdAddr, "scheduler", "config", "split-bucket-scheduler"}, nil) - re.Contains(echo, "\"degree\": 10") - echo = mustExec([]string{"-u", pdAddr, "scheduler", "remove", "split-bucket-scheduler"}, nil) - re.Contains(echo, "Success!") }