diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index 6cbd04ee671..2adcfbe7e48 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -51,6 +51,8 @@ type evictLeaderSchedulerConfig struct { syncutil.RWMutex storage endpoint.ConfigStorage StoreIDWithRanges map[uint64][]core.KeyRange `json:"store-id-ranges"` + // Batch is used to generate multiple operators by one scheduling + Batch int `json:"batch"` cluster *core.BasicCluster removeSchedulerCb func(string) error } @@ -65,23 +67,10 @@ func (conf *evictLeaderSchedulerConfig) getStores() []uint64 { return stores } -func (conf *evictLeaderSchedulerConfig) BuildWithArgs(args []string) error { - if len(args) != 1 { - return errs.ErrSchedulerConfig.FastGenByArgs("id") - } - - id, err := strconv.ParseUint(args[0], 10, 64) - if err != nil { - return errs.ErrStrconvParseUint.Wrap(err) - } - ranges, err := getKeyRanges(args[1:]) - if err != nil { - return err - } - conf.Lock() - defer conf.Unlock() - conf.StoreIDWithRanges[id] = ranges - return nil +func (conf *evictLeaderSchedulerConfig) getBatch() int { + conf.RLock() + defer conf.RUnlock() + return conf.Batch } func (conf *evictLeaderSchedulerConfig) Clone() *evictLeaderSchedulerConfig { @@ -93,13 +82,12 @@ func (conf *evictLeaderSchedulerConfig) Clone() *evictLeaderSchedulerConfig { } return &evictLeaderSchedulerConfig{ StoreIDWithRanges: storeIDWithRanges, + Batch: conf.Batch, } } -func (conf *evictLeaderSchedulerConfig) Persist() error { +func (conf *evictLeaderSchedulerConfig) persistLocked() error { name := conf.getSchedulerName() - conf.RLock() - defer conf.RUnlock() data, err := EncodeConfig(conf) failpoint.Inject("persistFail", func() { err = errors.New("fail to persist") @@ -125,29 +113,29 @@ func (conf *evictLeaderSchedulerConfig) getRanges(id uint64) []string { return res } -func (conf *evictLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last bool) { - conf.Lock() - defer conf.Unlock() +func (conf *evictLeaderSchedulerConfig) removeStoreLocked(id uint64) (bool, error) { _, exists := conf.StoreIDWithRanges[id] - succ, last = false, false if exists { delete(conf.StoreIDWithRanges, id) conf.cluster.ResumeLeaderTransfer(id) - succ = true - last = len(conf.StoreIDWithRanges) == 0 + return len(conf.StoreIDWithRanges) == 0, nil } - return succ, last + return false, errs.ErrScheduleConfigNotExist.FastGenByArgs() } -func (conf *evictLeaderSchedulerConfig) resetStore(id uint64, keyRange []core.KeyRange) { - conf.Lock() - defer conf.Unlock() +func (conf *evictLeaderSchedulerConfig) resetStoreLocked(id uint64, keyRange []core.KeyRange) { if err := conf.cluster.PauseLeaderTransfer(id); err != nil { log.Error("pause leader transfer failed", zap.Uint64("store-id", id), errs.ZapError(err)) } conf.StoreIDWithRanges[id] = keyRange } +func (conf *evictLeaderSchedulerConfig) resetStore(id uint64, keyRange []core.KeyRange) { + conf.Lock() + defer conf.Unlock() + conf.resetStoreLocked(id, keyRange) +} + func (conf *evictLeaderSchedulerConfig) getKeyRangesByID(id uint64) []core.KeyRange { conf.RLock() defer conf.RUnlock() @@ -157,6 +145,108 @@ func (conf *evictLeaderSchedulerConfig) getKeyRangesByID(id uint64) []core.KeyRa return nil } +func (conf *evictLeaderSchedulerConfig) encodeConfig() ([]byte, error) { + conf.RLock() + defer conf.RUnlock() + return EncodeConfig(conf) +} + +func (conf *evictLeaderSchedulerConfig) reloadConfig(name string) error { + conf.Lock() + defer conf.Unlock() + cfgData, err := conf.storage.LoadSchedulerConfig(name) + if err != nil { + return err + } + if len(cfgData) == 0 { + return nil + } + newCfg := &evictLeaderSchedulerConfig{} + if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { + return err + } + pauseAndResumeLeaderTransfer(conf.cluster, conf.StoreIDWithRanges, newCfg.StoreIDWithRanges) + conf.StoreIDWithRanges = newCfg.StoreIDWithRanges + conf.Batch = newCfg.Batch + return nil +} + +func (conf *evictLeaderSchedulerConfig) pauseLeaderTransfer(cluster sche.SchedulerCluster) error { + conf.RLock() + defer conf.RUnlock() + var res error + for id := range conf.StoreIDWithRanges { + if err := cluster.PauseLeaderTransfer(id); err != nil { + res = err + } + } + return res +} + +func (conf *evictLeaderSchedulerConfig) resumeLeaderTransfer(cluster sche.SchedulerCluster) { + conf.RLock() + defer conf.RUnlock() + for id := range conf.StoreIDWithRanges { + cluster.ResumeLeaderTransfer(id) + } +} + +func (conf *evictLeaderSchedulerConfig) pauseLeaderTransferIfStoreNotExist(id uint64) (bool, error) { + conf.RLock() + defer conf.RUnlock() + if _, exist := conf.StoreIDWithRanges[id]; !exist { + if err := conf.cluster.PauseLeaderTransfer(id); err != nil { + return exist, err + } + } + return true, nil +} + +func (conf *evictLeaderSchedulerConfig) update(id uint64, newRanges []core.KeyRange, batch int) error { + conf.Lock() + defer conf.Unlock() + if id != 0 { + conf.StoreIDWithRanges[id] = newRanges + } + conf.Batch = batch + err := conf.persistLocked() + if err != nil && id != 0 { + _, _ = conf.removeStoreLocked(id) + } + return err +} + +func (conf *evictLeaderSchedulerConfig) delete(id uint64) (any, error) { + conf.Lock() + var resp any + last, err := conf.removeStoreLocked(id) + if err != nil { + conf.Unlock() + return resp, err + } + + keyRanges := conf.StoreIDWithRanges[id] + err = conf.persistLocked() + if err != nil { + conf.resetStoreLocked(id, keyRanges) + conf.Unlock() + return resp, err + } + if !last { + conf.Unlock() + return resp, nil + } + conf.Unlock() + if err := conf.removeSchedulerCb(EvictLeaderName); err != nil { + if !errors.ErrorEqual(err, errs.ErrSchedulerNotFound.FastGenByArgs()) { + conf.resetStore(id, keyRanges) + } + return resp, err + } + resp = lastStoreDeleteInfo + return resp, nil +} + type evictLeaderScheduler struct { *BaseScheduler conf *evictLeaderSchedulerConfig @@ -193,48 +283,19 @@ func (*evictLeaderScheduler) GetType() string { } func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) { - s.conf.RLock() - defer s.conf.RUnlock() - return EncodeConfig(s.conf) + return s.conf.encodeConfig() } func (s *evictLeaderScheduler) ReloadConfig() error { - s.conf.Lock() - defer s.conf.Unlock() - cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName()) - if err != nil { - return err - } - if len(cfgData) == 0 { - return nil - } - newCfg := &evictLeaderSchedulerConfig{} - if err = DecodeConfig([]byte(cfgData), newCfg); err != nil { - return err - } - pauseAndResumeLeaderTransfer(s.conf.cluster, s.conf.StoreIDWithRanges, newCfg.StoreIDWithRanges) - s.conf.StoreIDWithRanges = newCfg.StoreIDWithRanges - return nil + return s.conf.reloadConfig(s.GetName()) } func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { - s.conf.RLock() - defer s.conf.RUnlock() - var res error - for id := range s.conf.StoreIDWithRanges { - if err := cluster.PauseLeaderTransfer(id); err != nil { - res = err - } - } - return res + return s.conf.pauseLeaderTransfer(cluster) } func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) { - s.conf.RLock() - defer s.conf.RUnlock() - for id := range s.conf.StoreIDWithRanges { - cluster.ResumeLeaderTransfer(id) - } + s.conf.resumeLeaderTransfer(cluster) } func (s *evictLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { @@ -247,7 +308,7 @@ func (s *evictLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) func (s *evictLeaderScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) { evictLeaderCounter.Inc() - return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize), nil + return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf), nil } func uniqueAppendOperator(dst []*operator.Operator, src ...*operator.Operator) []*operator.Operator { @@ -268,10 +329,12 @@ func uniqueAppendOperator(dst []*operator.Operator, src ...*operator.Operator) [ type evictLeaderStoresConf interface { getStores() []uint64 getKeyRangesByID(id uint64) []core.KeyRange + getBatch() int } -func scheduleEvictLeaderBatch(name, typ string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf, batchSize int) []*operator.Operator { +func scheduleEvictLeaderBatch(name, typ string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator { var ops []*operator.Operator + batchSize := conf.getBatch() for i := 0; i < batchSize; i++ { once := scheduleEvictLeaderOnce(name, typ, cluster, conf) // no more regions @@ -354,39 +417,50 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { return } - var args []string - var exists bool - var id uint64 - idFloat, ok := input["store_id"].(float64) - if ok { + var ( + exist bool + err error + id uint64 + newRanges []core.KeyRange + ) + idFloat, inputHasStoreID := input["store_id"].(float64) + if inputHasStoreID { id = (uint64)(idFloat) - handler.config.RLock() - if _, exists = handler.config.StoreIDWithRanges[id]; !exists { - if err := handler.config.cluster.PauseLeaderTransfer(id); err != nil { - handler.config.RUnlock() - handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } + exist, err = handler.config.pauseLeaderTransferIfStoreNotExist(id) + if err != nil { + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + } + + batch := handler.config.getBatch() + batchFloat, ok := input["batch"].(float64) + if ok { + if batchFloat < 1 || batchFloat > 10 { + handler.rd.JSON(w, http.StatusBadRequest, "batch is invalid, it should be in [1, 10]") + return } - handler.config.RUnlock() - args = append(args, strconv.FormatUint(id, 10)) + batch = (int)(batchFloat) } ranges, ok := (input["ranges"]).([]string) if ok { - args = append(args, ranges...) - } else if exists { - args = append(args, handler.config.getRanges(id)...) + if !inputHasStoreID { + handler.rd.JSON(w, http.StatusInternalServerError, errs.ErrSchedulerConfig.FastGenByArgs("id")) + return + } + } else if exist { + ranges = handler.config.getRanges(id) } - err := handler.config.BuildWithArgs(args) + newRanges, err = getKeyRanges(ranges) if err != nil { - handler.rd.JSON(w, http.StatusBadRequest, err.Error()) + handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } - err = handler.config.Persist() + + err = handler.config.update(id, newRanges, batch) if err != nil { - handler.config.removeStore(id) handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } @@ -406,33 +480,17 @@ func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.R return } - var resp any - keyRanges := handler.config.getKeyRangesByID(id) - succ, last := handler.config.removeStore(id) - if succ { - err = handler.config.Persist() - if err != nil { - handler.config.resetStore(id, keyRanges) + resp, err := handler.config.delete(id) + if err != nil { + if errors.ErrorEqual(err, errs.ErrSchedulerNotFound.FastGenByArgs()) || errors.ErrorEqual(err, errs.ErrScheduleConfigNotExist.FastGenByArgs()) { + handler.rd.JSON(w, http.StatusNotFound, err.Error()) + } else { handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - if last { - if err := handler.config.removeSchedulerCb(EvictLeaderName); err != nil { - if errors.ErrorEqual(err, errs.ErrSchedulerNotFound.FastGenByArgs()) { - handler.rd.JSON(w, http.StatusNotFound, err.Error()) - } else { - handler.config.resetStore(id, keyRanges) - handler.rd.JSON(w, http.StatusInternalServerError, err.Error()) - } - return - } - resp = lastStoreDeleteInfo } - handler.rd.JSON(w, http.StatusOK, resp) return } - handler.rd.JSON(w, http.StatusNotFound, errs.ErrScheduleConfigNotExist.FastGenByArgs().Error()) + handler.rd.JSON(w, http.StatusOK, resp) } func newEvictLeaderHandler(config *evictLeaderSchedulerConfig) http.Handler { diff --git a/pkg/schedule/schedulers/evict_leader_test.go b/pkg/schedule/schedulers/evict_leader_test.go index a91b1c3c937..63f7cde3b15 100644 --- a/pkg/schedule/schedulers/evict_leader_test.go +++ b/pkg/schedule/schedulers/evict_leader_test.go @@ -89,18 +89,43 @@ func TestConfigClone(t *testing.T) { emptyConf := &evictLeaderSchedulerConfig{StoreIDWithRanges: make(map[uint64][]core.KeyRange)} con2 := emptyConf.Clone() - re.Empty(emptyConf.getKeyRangesByID(1)) - re.NoError(con2.BuildWithArgs([]string{"1"})) - re.NotEmpty(con2.getKeyRangesByID(1)) - re.Empty(emptyConf.getKeyRangesByID(1)) + re.Empty(con2.getKeyRangesByID(1)) + con2.StoreIDWithRanges[1], _ = getKeyRanges([]string{"a", "b", "c", "d"}) con3 := con2.Clone() - con3.StoreIDWithRanges[1], _ = getKeyRanges([]string{"a", "b", "c", "d"}) - re.Empty(emptyConf.getKeyRangesByID(1)) - re.NotEqual(len(con3.getRanges(1)), len(con2.getRanges(1))) + re.Equal(len(con3.getRanges(1)), len(con2.getRanges(1))) + con3.StoreIDWithRanges[1][0].StartKey = []byte("aaa") con4 := con3.Clone() re.True(bytes.Equal(con4.StoreIDWithRanges[1][0].StartKey, con3.StoreIDWithRanges[1][0].StartKey)) - con4.StoreIDWithRanges[1][0].StartKey = []byte("aaa") - re.False(bytes.Equal(con4.StoreIDWithRanges[1][0].StartKey, con3.StoreIDWithRanges[1][0].StartKey)) + + con4.Batch = 10 + con5 := con4.Clone() + re.Equal(con5.getBatch(), con4.getBatch()) +} + +func TestBatchEvict(t *testing.T) { + re := require.New(t) + cancel, _, tc, oc := prepareSchedulersTest() + defer cancel() + + // Add stores 1, 2, 3 + tc.AddLeaderStore(1, 0) + tc.AddLeaderStore(2, 0) + tc.AddLeaderStore(3, 0) + // the random might be the same, so we add 1000 regions to make sure the batch is full + for i := 1; i <= 1000; i++ { + tc.AddLeaderRegion(uint64(i), 1, 2, 3) + } + tc.AddLeaderRegion(6, 2, 1, 3) + tc.AddLeaderRegion(7, 3, 1, 2) + + sl, err := CreateScheduler(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(EvictLeaderType, []string{"1"}), func(string) error { return nil }) + re.NoError(err) + re.True(sl.IsScheduleAllowed(tc)) + ops, _ := sl.Schedule(tc, false) + re.Len(ops, 3) + sl.(*evictLeaderScheduler).conf.Batch = 5 + ops, _ = sl.Schedule(tc, false) + re.Len(ops, 5) } diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go index b4cc79e782a..c9f10fa610f 100644 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -96,6 +96,10 @@ func (conf *evictSlowStoreSchedulerConfig) getKeyRangesByID(id uint64) []core.Ke return []core.KeyRange{core.NewKeyRange("", "")} } +func (*evictSlowStoreSchedulerConfig) getBatch() int { + return EvictLeaderBatchSize +} + func (conf *evictSlowStoreSchedulerConfig) evictStore() uint64 { if len(conf.getStores()) == 0 { return 0 @@ -263,7 +267,7 @@ func (s *evictSlowStoreScheduler) cleanupEvictLeader(cluster sche.SchedulerClust } func (s *evictSlowStoreScheduler) schedulerEvictLeader(cluster sche.SchedulerCluster) []*operator.Operator { - return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize) + return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf) } func (s *evictSlowStoreScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go index da3dbc24e95..dc2266b5540 100644 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -110,6 +110,10 @@ func (conf *evictSlowTrendSchedulerConfig) getKeyRangesByID(id uint64) []core.Ke return []core.KeyRange{core.NewKeyRange("", "")} } +func (*evictSlowTrendSchedulerConfig) getBatch() int { + return EvictLeaderBatchSize +} + func (conf *evictSlowTrendSchedulerConfig) hasEvictedStores() bool { conf.RLock() defer conf.RUnlock() @@ -370,7 +374,7 @@ func (s *evictSlowTrendScheduler) scheduleEvictLeader(cluster sche.SchedulerClus return nil } storeSlowTrendEvictedStatusGauge.WithLabelValues(store.GetAddress(), strconv.FormatUint(store.GetID(), 10)).Set(1) - return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize) + return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf) } func (s *evictSlowTrendScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index 6bca686404d..777c8b3d625 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -137,6 +137,7 @@ func schedulersRegister() { return err } conf.StoreIDWithRanges[id] = ranges + conf.Batch = EvictLeaderBatchSize return nil } }) diff --git a/pkg/schedule/schedulers/scheduler_test.go b/pkg/schedule/schedulers/scheduler_test.go index 5a603515942..48040841c76 100644 --- a/pkg/schedule/schedulers/scheduler_test.go +++ b/pkg/schedule/schedulers/scheduler_test.go @@ -148,8 +148,8 @@ func TestRemoveRejectLeader(t *testing.T) { el, err := CreateScheduler(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(EvictLeaderType, []string{"1"}), func(string) error { return nil }) re.NoError(err) tc.DeleteStore(tc.GetStore(1)) - succ, _ := el.(*evictLeaderScheduler).conf.removeStore(1) - re.True(succ) + _, err = el.(*evictLeaderScheduler).conf.removeStoreLocked(1) + re.NoError(err) } func TestShuffleHotRegionScheduleBalance(t *testing.T) { diff --git a/tools/pd-ctl/pdctl/command/scheduler.go b/tools/pd-ctl/pdctl/command/scheduler.go index c1db24cc176..4c85bb64037 100644 --- a/tools/pd-ctl/pdctl/command/scheduler.go +++ b/tools/pd-ctl/pdctl/command/scheduler.go @@ -592,6 +592,10 @@ func newConfigEvictLeaderCommand() *cobra.Command { Use: "delete-store ", Short: "delete a store from evict leader list", Run: func(cmd *cobra.Command, args []string) { deleteStoreFromSchedulerConfig(cmd, c.Name(), args) }, + }, &cobra.Command{ + Use: "set ", + Short: "set the config item", + Run: func(cmd *cobra.Command, args []string) { postSchedulerConfigCommandFunc(cmd, c.Name(), args) }, }) return c } diff --git a/tools/pd-ctl/tests/scheduler/scheduler_test.go b/tools/pd-ctl/tests/scheduler/scheduler_test.go index 3f58175b5fa..3a6e29f3586 100644 --- a/tools/pd-ctl/tests/scheduler/scheduler_test.go +++ b/tools/pd-ctl/tests/scheduler/scheduler_test.go @@ -140,7 +140,7 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *pdTests.TestCluster) { testutil.Eventually(re, func() bool { configInfo := make(map[string]any) mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName}, &configInfo) - return reflect.DeepEqual(expectedConfig, configInfo) + return reflect.DeepEqual(expectedConfig["store-id-ranges"], configInfo["store-id-ranges"]) }) } @@ -530,6 +530,27 @@ func (suite *schedulerTestSuite) checkSchedulerConfig(cluster *pdTests.TestClust return !strings.Contains(echo, "shuffle-hot-region-scheduler") }) + // test evict leader scheduler + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}, nil) + re.Contains(echo, "Success!") + testutil.Eventually(re, func() bool { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + return strings.Contains(echo, "evict-leader-scheduler") + }) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-leader-scheduler", "set", "batch", "5"}, nil) + re.Contains(echo, "Success!") + conf = make(map[string]any) + testutil.Eventually(re, func() bool { + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-leader-scheduler"}, &conf) + return conf["batch"] == 5. + }) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}, nil) + re.Contains(echo, "Success!") + testutil.Eventually(re, func() bool { + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + return !strings.Contains(echo, "evict-leader-scheduler") + }) + // test balance leader config conf = make(map[string]any) conf1 := make(map[string]any)