Skip to content

Commit

Permalink
unsafe recovery: Introduce auto-detect mode for online recovery (#5403)
Browse files Browse the repository at this point in the history
close #5415

Support auto-detect failed stores for online recovery

Signed-off-by: Connor1996 <zbk602423539@gmail.com>
  • Loading branch information
Connor1996 authored Aug 11, 2022
1 parent 0791fe8 commit 8321f10
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 74 deletions.
25 changes: 15 additions & 10 deletions server/api/unsafe_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,29 @@ func (h *unsafeOperationHandler) RemoveFailedStores(w http.ResponseWriter, r *ht
rc := getCluster(r)
var input map[string]interface{}
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
storeSlice, ok := typeutil.JSONToUint64Slice(input["stores"])
if !ok {
h.rd.JSON(w, http.StatusBadRequest, "Store ids are invalid")
return
}

stores := make(map[uint64]struct{})
for _, store := range storeSlice {
stores[store] = struct{}{}
autoDetect, exists := input["auto-detect"].(bool)
if !exists || !autoDetect {
storeSlice, ok := typeutil.JSONToUint64Slice(input["stores"])
if !ok {
h.rd.JSON(w, http.StatusBadRequest, "Store ids are invalid")
return
}
for _, store := range storeSlice {
stores[store] = struct{}{}
}
}

timeout := uint64(600)
rawTimeout, exists := input["timeout"].(float64)
if exists {
if rawTimeout, exists := input["timeout"].(float64); exists {
timeout = uint64(rawTimeout)
}

if err := rc.GetUnsafeRecoveryController().RemoveFailedStores(stores, timeout); err != nil {
if err := rc.GetUnsafeRecoveryController().RemoveFailedStores(stores, timeout, autoDetect); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
Expand Down
22 changes: 19 additions & 3 deletions server/api/unsafe_operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestUnsafeOperationTestSuite(t *testing.T) {
suite.Run(t, new(unsafeOperationTestSuite))
}

func (suite *unsafeOperationTestSuite) SetupSuite() {
func (suite *unsafeOperationTestSuite) SetupTest() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
server.MustWaitLeader(re, []*server.Server{suite.svr})
Expand All @@ -49,14 +49,15 @@ func (suite *unsafeOperationTestSuite) SetupSuite() {
mustPutStore(re, suite.svr, 1, metapb.StoreState_Offline, metapb.NodeState_Removing, nil)
}

func (suite *unsafeOperationTestSuite) TearDownSuite() {
func (suite *unsafeOperationTestSuite) TearDownTest() {
suite.cleanup()
}

func (suite *unsafeOperationTestSuite) TestRemoveFailedStores() {
re := suite.Require()

input := map[string]interface{}{"stores": []uint64{}}
data, _ := json.Marshal(input)
re := suite.Require()
err := tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/remove-failed-stores", data, tu.StatusNotOK(re),
tu.StringEqual(re, "\"[PD:unsaferecovery:ErrUnsafeRecoveryInvalidInput]invalid input no store specified\"\n"))
suite.NoError(err)
Expand All @@ -83,3 +84,18 @@ func (suite *unsafeOperationTestSuite) TestRemoveFailedStores() {
err = tu.ReadGetJSON(re, testDialClient, suite.urlPrefix+"/remove-failed-stores/show", &output)
suite.NoError(err)
}

func (suite *unsafeOperationTestSuite) TestRemoveFailedStoresAutoDetect() {
re := suite.Require()

input := map[string]interface{}{"auto-detect": false}
data, _ := json.Marshal(input)
err := tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/remove-failed-stores", data, tu.StatusNotOK(re),
tu.StringEqual(re, "\"Store ids are invalid\"\n"))
suite.NoError(err)

input = map[string]interface{}{"auto-detect": true}
data, _ = json.Marshal(input)
err = tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/remove-failed-stores", data, tu.StatusOK(re))
suite.NoError(err)
}
75 changes: 47 additions & 28 deletions server/cluster/unsafe_recovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type unsafeRecoveryController struct {
step uint64
failedStores map[uint64]struct{}
timeout time.Time
autoDetect bool

// collected reports from store, if not reported yet, it would be nil
storeReports map[uint64]*pdpb.StoreReport
Expand Down Expand Up @@ -164,30 +165,32 @@ func (u *unsafeRecoveryController) IsRunning() bool {
}

// RemoveFailedStores removes failed stores from the cluster.
func (u *unsafeRecoveryController) RemoveFailedStores(failedStores map[uint64]struct{}, timeout uint64) error {
func (u *unsafeRecoveryController) RemoveFailedStores(failedStores map[uint64]struct{}, timeout uint64, autoDetect bool) error {
if u.IsRunning() {
return errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs()
}
u.Lock()
defer u.Unlock()

if len(failedStores) == 0 {
return errs.ErrUnsafeRecoveryInvalidInput.FastGenByArgs("no store specified")
}
if !autoDetect {
if len(failedStores) == 0 {
return errs.ErrUnsafeRecoveryInvalidInput.FastGenByArgs("no store specified")
}

// validate the stores and mark the store as tombstone forcibly
for failedStore := range failedStores {
store := u.cluster.GetStore(failedStore)
if store == nil {
return errs.ErrUnsafeRecoveryInvalidInput.FastGenByArgs(fmt.Sprintf("store %v doesn't exist", failedStore))
} else if (store.IsPreparing() || store.IsServing()) && !store.IsDisconnected() {
return errs.ErrUnsafeRecoveryInvalidInput.FastGenByArgs(fmt.Sprintf("store %v is up and connected", failedStore))
// validate the stores and mark the store as tombstone forcibly
for failedStore := range failedStores {
store := u.cluster.GetStore(failedStore)
if store == nil {
return errs.ErrUnsafeRecoveryInvalidInput.FastGenByArgs(fmt.Sprintf("store %v doesn't exist", failedStore))
} else if (store.IsPreparing() || store.IsServing()) && !store.IsDisconnected() {
return errs.ErrUnsafeRecoveryInvalidInput.FastGenByArgs(fmt.Sprintf("store %v is up and connected", failedStore))
}
}
}
for failedStore := range failedStores {
err := u.cluster.BuryStore(failedStore, true)
if err != nil && !errors.ErrorEqual(err, errs.ErrStoreNotFound.FastGenByArgs(failedStore)) {
return err
for failedStore := range failedStores {
err := u.cluster.BuryStore(failedStore, true)
if err != nil && !errors.ErrorEqual(err, errs.ErrStoreNotFound.FastGenByArgs(failedStore)) {
return err
}
}
}

Expand All @@ -204,6 +207,7 @@ func (u *unsafeRecoveryController) RemoveFailedStores(failedStores map[uint64]st

u.timeout = time.Now().Add(time.Duration(timeout) * time.Second)
u.failedStores = failedStores
u.autoDetect = autoDetect
u.changeStage(collectReport)
return nil
}
Expand Down Expand Up @@ -384,7 +388,7 @@ func (u *unsafeRecoveryController) dispatchPlan(heartbeat *pdpb.StoreHeartbeatRe

if reported, exist := u.storeReports[storeID]; reported != nil || !exist {
// the plan has been executed, no need to dispatch again
// or no need to displan plan to this store(e.g. Tiflash)
// or no need to dispatch plan to this store(e.g. Tiflash)
return
}

Expand Down Expand Up @@ -445,17 +449,23 @@ func (u *unsafeRecoveryController) changeStage(stage unsafeRecoveryStage) {
switch u.stage {
case idle:
case collectReport:
stores := ""
count := 0
for store := range u.failedStores {
count += 1
stores += fmt.Sprintf("%d", store)
if count != len(u.failedStores) {
stores += ", "
// TODO: clean up existing operators
output.Info = "Unsafe recovery enters collect report stage"
if u.autoDetect {
output.Details = append(output.Details, "auto detect mode with no specified failed stores")
} else {
stores := ""
count := 0
for store := range u.failedStores {
count += 1
stores += fmt.Sprintf("%d", store)
if count != len(u.failedStores) {
stores += ", "
}
}
output.Details = append(output.Details, fmt.Sprintf("failed stores %s", stores))
}
// TODO: clean up existing operators
output.Info = fmt.Sprintf("Unsafe recovery enters collect report stage: failed stores %s", stores)

case tombstoneTiFlashLearner:
output.Info = "Unsafe recovery enters tombstone TiFlash learner stage"
output.Actions = u.getTombstoneTiFlashLearnerDigest()
Expand Down Expand Up @@ -615,13 +625,22 @@ func (u *unsafeRecoveryController) recordAffectedRegion(region *metapb.Region) {
}
}

func (u *unsafeRecoveryController) isFailed(peer *metapb.Peer) bool {
_, isFailed := u.failedStores[peer.StoreId]
_, isLive := u.storeReports[peer.StoreId]
if isFailed || (u.autoDetect && !isLive) {
return true
}
return false
}

func (u *unsafeRecoveryController) canElectLeader(region *metapb.Region, onlyIncoming bool) bool {
hasQuorum := func(voters []*metapb.Peer) bool {
numFailedVoters := 0
numLiveVoters := 0

for _, voter := range voters {
if _, ok := u.failedStores[voter.StoreId]; ok {
if u.isFailed(voter) {
numFailedVoters += 1
} else {
numLiveVoters += 1
Expand Down Expand Up @@ -657,7 +676,7 @@ func (u *unsafeRecoveryController) getFailedPeers(region *metapb.Region) []*meta
if peer.Role == metapb.PeerRole_Learner || peer.Role == metapb.PeerRole_DemotingVoter {
continue
}
if _, ok := u.failedStores[peer.StoreId]; ok {
if u.isFailed(peer) {
failedPeers = append(failedPeers, peer)
}
}
Expand Down
Loading

0 comments on commit 8321f10

Please sign in to comment.