Skip to content

Commit

Permalink
scheduler: add batch config for evict leader scheduler (#8259)
Browse files Browse the repository at this point in the history
close #8265

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: lhy1024 <liuhanyang@pingcap.com>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Jul 18, 2024
1 parent 1a20c85 commit 3330a44
Show file tree
Hide file tree
Showing 8 changed files with 239 additions and 122 deletions.
274 changes: 166 additions & 108 deletions pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type evictLeaderSchedulerConfig struct {
syncutil.RWMutex
storage endpoint.ConfigStorage
StoreIDWithRanges map[uint64][]core.KeyRange `json:"store-id-ranges"`
// Batch is used to generate multiple operators by one scheduling
Batch int `json:"batch"`
cluster *core.BasicCluster
removeSchedulerCb func(string) error
}
Expand All @@ -65,23 +67,10 @@ func (conf *evictLeaderSchedulerConfig) getStores() []uint64 {
return stores
}

func (conf *evictLeaderSchedulerConfig) BuildWithArgs(args []string) error {
if len(args) != 1 {
return errs.ErrSchedulerConfig.FastGenByArgs("id")
}

id, err := strconv.ParseUint(args[0], 10, 64)
if err != nil {
return errs.ErrStrconvParseUint.Wrap(err)
}
ranges, err := getKeyRanges(args[1:])
if err != nil {
return err
}
conf.Lock()
defer conf.Unlock()
conf.StoreIDWithRanges[id] = ranges
return nil
func (conf *evictLeaderSchedulerConfig) getBatch() int {
conf.RLock()
defer conf.RUnlock()
return conf.Batch
}

func (conf *evictLeaderSchedulerConfig) Clone() *evictLeaderSchedulerConfig {
Expand All @@ -93,13 +82,12 @@ func (conf *evictLeaderSchedulerConfig) Clone() *evictLeaderSchedulerConfig {
}
return &evictLeaderSchedulerConfig{
StoreIDWithRanges: storeIDWithRanges,
Batch: conf.Batch,
}
}

func (conf *evictLeaderSchedulerConfig) Persist() error {
func (conf *evictLeaderSchedulerConfig) persistLocked() error {
name := conf.getSchedulerName()
conf.RLock()
defer conf.RUnlock()
data, err := EncodeConfig(conf)
failpoint.Inject("persistFail", func() {
err = errors.New("fail to persist")
Expand All @@ -125,29 +113,29 @@ func (conf *evictLeaderSchedulerConfig) getRanges(id uint64) []string {
return res
}

func (conf *evictLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last bool) {
conf.Lock()
defer conf.Unlock()
func (conf *evictLeaderSchedulerConfig) removeStoreLocked(id uint64) (bool, error) {
_, exists := conf.StoreIDWithRanges[id]
succ, last = false, false
if exists {
delete(conf.StoreIDWithRanges, id)
conf.cluster.ResumeLeaderTransfer(id)
succ = true
last = len(conf.StoreIDWithRanges) == 0
return len(conf.StoreIDWithRanges) == 0, nil
}
return succ, last
return false, errs.ErrScheduleConfigNotExist.FastGenByArgs()
}

func (conf *evictLeaderSchedulerConfig) resetStore(id uint64, keyRange []core.KeyRange) {
conf.Lock()
defer conf.Unlock()
func (conf *evictLeaderSchedulerConfig) resetStoreLocked(id uint64, keyRange []core.KeyRange) {
if err := conf.cluster.PauseLeaderTransfer(id); err != nil {
log.Error("pause leader transfer failed", zap.Uint64("store-id", id), errs.ZapError(err))
}
conf.StoreIDWithRanges[id] = keyRange
}

func (conf *evictLeaderSchedulerConfig) resetStore(id uint64, keyRange []core.KeyRange) {
conf.Lock()
defer conf.Unlock()
conf.resetStoreLocked(id, keyRange)
}

func (conf *evictLeaderSchedulerConfig) getKeyRangesByID(id uint64) []core.KeyRange {
conf.RLock()
defer conf.RUnlock()
Expand All @@ -157,6 +145,108 @@ func (conf *evictLeaderSchedulerConfig) getKeyRangesByID(id uint64) []core.KeyRa
return nil
}

func (conf *evictLeaderSchedulerConfig) encodeConfig() ([]byte, error) {
conf.RLock()
defer conf.RUnlock()
return EncodeConfig(conf)
}

func (conf *evictLeaderSchedulerConfig) reloadConfig(name string) error {
conf.Lock()
defer conf.Unlock()
cfgData, err := conf.storage.LoadSchedulerConfig(name)
if err != nil {
return err
}
if len(cfgData) == 0 {
return nil
}
newCfg := &evictLeaderSchedulerConfig{}
if err = DecodeConfig([]byte(cfgData), newCfg); err != nil {
return err
}
pauseAndResumeLeaderTransfer(conf.cluster, conf.StoreIDWithRanges, newCfg.StoreIDWithRanges)
conf.StoreIDWithRanges = newCfg.StoreIDWithRanges
conf.Batch = newCfg.Batch
return nil
}

func (conf *evictLeaderSchedulerConfig) pauseLeaderTransfer(cluster sche.SchedulerCluster) error {
conf.RLock()
defer conf.RUnlock()
var res error
for id := range conf.StoreIDWithRanges {
if err := cluster.PauseLeaderTransfer(id); err != nil {
res = err
}
}
return res
}

func (conf *evictLeaderSchedulerConfig) resumeLeaderTransfer(cluster sche.SchedulerCluster) {
conf.RLock()
defer conf.RUnlock()
for id := range conf.StoreIDWithRanges {
cluster.ResumeLeaderTransfer(id)
}
}

func (conf *evictLeaderSchedulerConfig) pauseLeaderTransferIfStoreNotExist(id uint64) (bool, error) {
conf.RLock()
defer conf.RUnlock()
if _, exist := conf.StoreIDWithRanges[id]; !exist {
if err := conf.cluster.PauseLeaderTransfer(id); err != nil {
return exist, err
}
}
return true, nil
}

func (conf *evictLeaderSchedulerConfig) update(id uint64, newRanges []core.KeyRange, batch int) error {
conf.Lock()
defer conf.Unlock()
if id != 0 {
conf.StoreIDWithRanges[id] = newRanges
}
conf.Batch = batch
err := conf.persistLocked()
if err != nil && id != 0 {
_, _ = conf.removeStoreLocked(id)
}
return err
}

func (conf *evictLeaderSchedulerConfig) delete(id uint64) (any, error) {
conf.Lock()
var resp any
last, err := conf.removeStoreLocked(id)
if err != nil {
conf.Unlock()
return resp, err
}

keyRanges := conf.StoreIDWithRanges[id]
err = conf.persistLocked()
if err != nil {
conf.resetStoreLocked(id, keyRanges)
conf.Unlock()
return resp, err
}
if !last {
conf.Unlock()
return resp, nil
}
conf.Unlock()
if err := conf.removeSchedulerCb(EvictLeaderName); err != nil {
if !errors.ErrorEqual(err, errs.ErrSchedulerNotFound.FastGenByArgs()) {
conf.resetStore(id, keyRanges)
}
return resp, err
}
resp = lastStoreDeleteInfo
return resp, nil
}

type evictLeaderScheduler struct {
*BaseScheduler
conf *evictLeaderSchedulerConfig
Expand Down Expand Up @@ -193,48 +283,19 @@ func (*evictLeaderScheduler) GetType() string {
}

func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) {
s.conf.RLock()
defer s.conf.RUnlock()
return EncodeConfig(s.conf)
return s.conf.encodeConfig()
}

func (s *evictLeaderScheduler) ReloadConfig() error {
s.conf.Lock()
defer s.conf.Unlock()
cfgData, err := s.conf.storage.LoadSchedulerConfig(s.GetName())
if err != nil {
return err
}
if len(cfgData) == 0 {
return nil
}
newCfg := &evictLeaderSchedulerConfig{}
if err = DecodeConfig([]byte(cfgData), newCfg); err != nil {
return err
}
pauseAndResumeLeaderTransfer(s.conf.cluster, s.conf.StoreIDWithRanges, newCfg.StoreIDWithRanges)
s.conf.StoreIDWithRanges = newCfg.StoreIDWithRanges
return nil
return s.conf.reloadConfig(s.GetName())
}

func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error {
s.conf.RLock()
defer s.conf.RUnlock()
var res error
for id := range s.conf.StoreIDWithRanges {
if err := cluster.PauseLeaderTransfer(id); err != nil {
res = err
}
}
return res
return s.conf.pauseLeaderTransfer(cluster)
}

func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) {
s.conf.RLock()
defer s.conf.RUnlock()
for id := range s.conf.StoreIDWithRanges {
cluster.ResumeLeaderTransfer(id)
}
s.conf.resumeLeaderTransfer(cluster)
}

func (s *evictLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool {
Expand All @@ -247,7 +308,7 @@ func (s *evictLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster)

func (s *evictLeaderScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) {
evictLeaderCounter.Inc()
return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize), nil
return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf), nil
}

func uniqueAppendOperator(dst []*operator.Operator, src ...*operator.Operator) []*operator.Operator {
Expand All @@ -268,10 +329,12 @@ func uniqueAppendOperator(dst []*operator.Operator, src ...*operator.Operator) [
type evictLeaderStoresConf interface {
getStores() []uint64
getKeyRangesByID(id uint64) []core.KeyRange
getBatch() int
}

func scheduleEvictLeaderBatch(name, typ string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf, batchSize int) []*operator.Operator {
func scheduleEvictLeaderBatch(name, typ string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator {
var ops []*operator.Operator
batchSize := conf.getBatch()
for i := 0; i < batchSize; i++ {
once := scheduleEvictLeaderOnce(name, typ, cluster, conf)
// no more regions
Expand Down Expand Up @@ -354,39 +417,50 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R
if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil {
return
}
var args []string
var exists bool
var id uint64
idFloat, ok := input["store_id"].(float64)
if ok {
var (
exist bool
err error
id uint64
newRanges []core.KeyRange
)
idFloat, inputHasStoreID := input["store_id"].(float64)
if inputHasStoreID {
id = (uint64)(idFloat)
handler.config.RLock()
if _, exists = handler.config.StoreIDWithRanges[id]; !exists {
if err := handler.config.cluster.PauseLeaderTransfer(id); err != nil {
handler.config.RUnlock()
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
exist, err = handler.config.pauseLeaderTransferIfStoreNotExist(id)
if err != nil {
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
}

batch := handler.config.getBatch()
batchFloat, ok := input["batch"].(float64)
if ok {
if batchFloat < 1 || batchFloat > 10 {
handler.rd.JSON(w, http.StatusBadRequest, "batch is invalid, it should be in [1, 10]")
return
}
handler.config.RUnlock()
args = append(args, strconv.FormatUint(id, 10))
batch = (int)(batchFloat)
}

ranges, ok := (input["ranges"]).([]string)
if ok {
args = append(args, ranges...)
} else if exists {
args = append(args, handler.config.getRanges(id)...)
if !inputHasStoreID {
handler.rd.JSON(w, http.StatusInternalServerError, errs.ErrSchedulerConfig.FastGenByArgs("id"))
return
}
} else if exist {
ranges = handler.config.getRanges(id)
}

err := handler.config.BuildWithArgs(args)
newRanges, err = getKeyRanges(ranges)
if err != nil {
handler.rd.JSON(w, http.StatusBadRequest, err.Error())
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
err = handler.config.Persist()

err = handler.config.update(id, newRanges, batch)
if err != nil {
handler.config.removeStore(id)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
Expand All @@ -406,33 +480,17 @@ func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.R
return
}

var resp any
keyRanges := handler.config.getKeyRangesByID(id)
succ, last := handler.config.removeStore(id)
if succ {
err = handler.config.Persist()
if err != nil {
handler.config.resetStore(id, keyRanges)
resp, err := handler.config.delete(id)
if err != nil {
if errors.ErrorEqual(err, errs.ErrSchedulerNotFound.FastGenByArgs()) || errors.ErrorEqual(err, errs.ErrScheduleConfigNotExist.FastGenByArgs()) {
handler.rd.JSON(w, http.StatusNotFound, err.Error())
} else {
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
if last {
if err := handler.config.removeSchedulerCb(EvictLeaderName); err != nil {
if errors.ErrorEqual(err, errs.ErrSchedulerNotFound.FastGenByArgs()) {
handler.rd.JSON(w, http.StatusNotFound, err.Error())
} else {
handler.config.resetStore(id, keyRanges)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
}
return
}
resp = lastStoreDeleteInfo
}
handler.rd.JSON(w, http.StatusOK, resp)
return
}

handler.rd.JSON(w, http.StatusNotFound, errs.ErrScheduleConfigNotExist.FastGenByArgs().Error())
handler.rd.JSON(w, http.StatusOK, resp)
}

func newEvictLeaderHandler(config *evictLeaderSchedulerConfig) http.Handler {
Expand Down
Loading

0 comments on commit 3330a44

Please sign in to comment.