Skip to content

Commit

Permalink
schedulers: rollback the scheduler when failing to persist (#3787) (#…
Browse files Browse the repository at this point in the history
…3824)

* rollback the scheduler when failing to persist

Signed-off-by: Ryan Leung <rleungx@gmail.com>

* address the comment

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
ti-chi-bot and rleungx authored Jul 23, 2021
1 parent d8c1f30 commit 7cba191
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 27 deletions.
22 changes: 8 additions & 14 deletions server/api/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,11 @@ func (h *schedulerHandler) Delete(w http.ResponseWriter, r *http.Request) {
name := mux.Vars(r)["name"]
switch {
case strings.HasPrefix(name, schedulers.EvictLeaderName) && name != schedulers.EvictLeaderName:
if err := h.redirectSchedulerDelete(name, schedulers.EvictLeaderName); err != nil {
h.handleErr(w, err)
return
}
h.redirectSchedulerDelete(w, name, schedulers.EvictLeaderName)
return
case strings.HasPrefix(name, schedulers.GrantLeaderName) && name != schedulers.GrantLeaderName:
if err := h.redirectSchedulerDelete(name, schedulers.GrantLeaderName); err != nil {
h.handleErr(w, err)
return
}
h.redirectSchedulerDelete(w, name, schedulers.GrantLeaderName)
return
default:
if err := h.RemoveScheduler(name); err != nil {
h.handleErr(w, err)
Expand All @@ -278,18 +274,16 @@ func (h *schedulerHandler) handleErr(w http.ResponseWriter, err error) {
}
}

func (h *schedulerHandler) redirectSchedulerDelete(name, schedulerName string) error {
func (h *schedulerHandler) redirectSchedulerDelete(w http.ResponseWriter, name, schedulerName string) {
args := strings.Split(name, "-")
args = args[len(args)-1:]
url := fmt.Sprintf("%s/%s/%s/delete/%s", h.GetAddr(), schedulerConfigPrefix, schedulerName, args[0])
resp, err := doDelete(h.svr.GetHTTPClient(), url)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return errs.ErrSchedulerNotFound.FastGenByArgs()
h.r.JSON(w, resp.StatusCode, err.Error())
return
}
return nil
h.r.JSON(w, resp.StatusCode, nil)
}

// FIXME: details of input json body params
Expand Down
24 changes: 21 additions & 3 deletions server/api/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/config"
Expand Down Expand Up @@ -57,16 +58,27 @@ func (s *testScheduleSuite) TestOriginAPI(c *C) {
body, err := json.Marshal(input)
c.Assert(err, IsNil)
c.Assert(postJSON(testDialClient, addURL, body), IsNil)
rc := s.svr.GetRaftCluster()
c.Assert(rc.GetSchedulers(), HasLen, 1)
resp := make(map[string]interface{})
listURL := fmt.Sprintf("%s%s%s/%s/list", s.svr.GetAddr(), apiPrefix, server.SchedulerConfigHandlerPath, "evict-leader-scheduler")
c.Assert(readJSON(testDialClient, listURL, &resp), IsNil)
c.Assert(resp["store-id-ranges"], HasLen, 1)
input1 := make(map[string]interface{})
input1["name"] = "evict-leader-scheduler"
input1["store_id"] = 2
body, err = json.Marshal(input1)
c.Assert(err, IsNil)
c.Assert(failpoint.Enable("github.com/tikv/pd/server/schedulers/persistFail", "return(true)"), IsNil)
c.Assert(postJSON(testDialClient, addURL, body), NotNil)
c.Assert(rc.GetSchedulers(), HasLen, 1)
resp = make(map[string]interface{})
c.Assert(readJSON(testDialClient, listURL, &resp), IsNil)
c.Assert(resp["store-id-ranges"], HasLen, 1)
c.Assert(failpoint.Disable("github.com/tikv/pd/server/schedulers/persistFail"), IsNil)
c.Assert(postJSON(testDialClient, addURL, body), IsNil)
rc := s.svr.GetRaftCluster()
c.Assert(rc.GetSchedulers(), HasLen, 1)
resp := make(map[string]interface{})
listURL := fmt.Sprintf("%s%s%s/%s/list", s.svr.GetAddr(), apiPrefix, server.SchedulerConfigHandlerPath, "evict-leader-scheduler")
resp = make(map[string]interface{})
c.Assert(readJSON(testDialClient, listURL, &resp), IsNil)
c.Assert(resp["store-id-ranges"], HasLen, 2)
deleteURL := fmt.Sprintf("%s/%s", s.urlPrefix, "evict-leader-scheduler-1")
Expand All @@ -77,8 +89,14 @@ func (s *testScheduleSuite) TestOriginAPI(c *C) {
c.Assert(readJSON(testDialClient, listURL, &resp1), IsNil)
c.Assert(resp1["store-id-ranges"], HasLen, 1)
deleteURL = fmt.Sprintf("%s/%s", s.urlPrefix, "evict-leader-scheduler-2")
c.Assert(failpoint.Enable("github.com/tikv/pd/server/config/persistFail", "return(true)"), IsNil)
res, err := doDelete(testDialClient, deleteURL)
c.Assert(err, IsNil)
c.Assert(res.StatusCode, Equals, 500)
c.Assert(rc.GetSchedulers(), HasLen, 1)
c.Assert(failpoint.Disable("github.com/tikv/pd/server/config/persistFail"), IsNil)
res, err = doDelete(testDialClient, deleteURL)
c.Assert(err, IsNil)
c.Assert(res.StatusCode, Equals, 200)
c.Assert(rc.GetSchedulers(), HasLen, 0)
resp2 := make(map[string]interface{})
Expand Down
16 changes: 7 additions & 9 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,28 +604,26 @@ func (c *coordinator) removeScheduler(name string) error {
return errs.ErrSchedulerNotFound.FastGenByArgs()
}

s.Stop()
schedulerStatusGauge.WithLabelValues(name, "allow").Set(0)
delete(c.schedulers, name)

var err error
opt := c.cluster.opt

if err = c.removeOptScheduler(opt, name); err != nil {
if err := c.removeOptScheduler(opt, name); err != nil {
log.Error("can not remove scheduler", zap.String("scheduler-name", name), errs.ZapError(err))
return err
}

if err = opt.Persist(c.cluster.storage); err != nil {
if err := opt.Persist(c.cluster.storage); err != nil {
log.Error("the option can not persist scheduler config", errs.ZapError(err))
return err
}

if err = c.cluster.storage.RemoveScheduleConfig(name); err != nil {
if err := c.cluster.storage.RemoveScheduleConfig(name); err != nil {
log.Error("can not remove the scheduler config", errs.ZapError(err))
return err
}

s.Stop()
schedulerStatusGauge.WithLabelValues(name, "allow").Set(0)
delete(c.schedulers, name)

return nil
}

Expand Down
8 changes: 7 additions & 1 deletion server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"unsafe"

"github.com/coreos/go-semver/semver"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/cache"
Expand Down Expand Up @@ -566,7 +568,11 @@ func (o *PersistOptions) Persist(storage *core.Storage) error {
LabelProperty: o.GetLabelPropertyConfig(),
ClusterVersion: *o.GetClusterVersion(),
}
return storage.SaveConfig(cfg)
err := storage.SaveConfig(cfg)
failpoint.Inject("persistFail", func() {
err = errors.New("fail to persist")
})
return err
}

// Reload reloads the configuration from the storage.
Expand Down
27 changes: 27 additions & 0 deletions server/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/gorilla/mux"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/apiutil"
"github.com/tikv/pd/pkg/errs"
Expand Down Expand Up @@ -116,6 +117,9 @@ func (conf *evictLeaderSchedulerConfig) Persist() error {
conf.mu.RLock()
defer conf.mu.RUnlock()
data, err := schedule.EncodeConfig(conf)
failpoint.Inject("persistFail", func() {
err = errors.New("fail to persist")
})
if err != nil {
return err
}
Expand Down Expand Up @@ -151,6 +155,22 @@ func (conf *evictLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last
return succ, last
}

func (conf *evictLeaderSchedulerConfig) resetStore(id uint64, keyRange []core.KeyRange) {
conf.mu.Lock()
defer conf.mu.Unlock()
conf.cluster.PauseLeaderTransfer(id)
conf.StoreIDWithRanges[id] = keyRange
}

func (conf *evictLeaderSchedulerConfig) getKeyRangesByID(id uint64) []core.KeyRange {
conf.mu.RLock()
defer conf.mu.RUnlock()
if ranges, exist := conf.StoreIDWithRanges[id]; exist {
return ranges
}
return nil
}

type evictLeaderScheduler struct {
*BaseScheduler
conf *evictLeaderSchedulerConfig
Expand Down Expand Up @@ -296,12 +316,15 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R
idFloat, ok := input["store_id"].(float64)
if ok {
id = (uint64)(idFloat)
handler.config.mu.RLock()
if _, exists = handler.config.StoreIDWithRanges[id]; !exists {
if err := handler.config.cluster.PauseLeaderTransfer(id); err != nil {
handler.config.mu.RUnlock()
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
}
handler.config.mu.RUnlock()
args = append(args, strconv.FormatUint(id, 10))
}

Expand All @@ -315,6 +338,7 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R
handler.config.BuildWithArgs(args)
err := handler.config.Persist()
if err != nil {
handler.config.removeStore(id)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
Expand All @@ -335,10 +359,12 @@ func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.R
}

var resp interface{}
keyRanges := handler.config.getKeyRangesByID(id)
succ, last := handler.config.removeStore(id)
if succ {
err = handler.config.Persist()
if err != nil {
handler.config.resetStore(id, keyRanges)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
Expand All @@ -347,6 +373,7 @@ func (handler *evictLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.R
if errors.ErrorEqual(err, errs.ErrSchedulerNotFound.FastGenByArgs()) {
handler.rd.JSON(w, http.StatusNotFound, err.Error())
} else {
handler.config.resetStore(id, keyRanges)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
}
return
Expand Down
23 changes: 23 additions & 0 deletions server/schedulers/grant_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,22 @@ func (conf *grantLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last
return succ, last
}

func (conf *grantLeaderSchedulerConfig) resetStore(id uint64, keyRange []core.KeyRange) {
conf.mu.Lock()
defer conf.mu.Unlock()
conf.cluster.PauseLeaderTransfer(id)
conf.StoreIDWithRanges[id] = keyRange
}

func (conf *grantLeaderSchedulerConfig) getKeyRangesByID(id uint64) []core.KeyRange {
conf.mu.RLock()
defer conf.mu.RUnlock()
if ranges, exist := conf.StoreIDWithRanges[id]; exist {
return ranges
}
return nil
}

// grantLeaderScheduler transfers all leaders to peers in the store.
type grantLeaderScheduler struct {
*BaseScheduler
Expand Down Expand Up @@ -250,12 +266,15 @@ func (handler *grantLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R
idFloat, ok := input["store_id"].(float64)
if ok {
id = (uint64)(idFloat)
handler.config.mu.RLock()
if _, exists = handler.config.StoreIDWithRanges[id]; !exists {
if err := handler.config.cluster.PauseLeaderTransfer(id); err != nil {
handler.config.mu.RUnlock()
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
}
handler.config.mu.RUnlock()
args = append(args, strconv.FormatUint(id, 10))
}

Expand All @@ -269,6 +288,7 @@ func (handler *grantLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R
handler.config.BuildWithArgs(args)
err := handler.config.Persist()
if err != nil {
handler.config.removeStore(id)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
Expand All @@ -289,10 +309,12 @@ func (handler *grantLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.R
}

var resp interface{}
keyRanges := handler.config.getKeyRangesByID(id)
succ, last := handler.config.removeStore(id)
if succ {
err = handler.config.Persist()
if err != nil {
handler.config.resetStore(id, keyRanges)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
Expand All @@ -301,6 +323,7 @@ func (handler *grantLeaderHandler) DeleteConfig(w http.ResponseWriter, r *http.R
if errors.ErrorEqual(err, errs.ErrSchedulerNotFound.FastGenByArgs()) {
handler.rd.JSON(w, http.StatusNotFound, err.Error())
} else {
handler.config.resetStore(id, keyRanges)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
}
return
Expand Down

0 comments on commit 7cba191

Please sign in to comment.