Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unsafe recovery: Introduce auto-detect mode for online recovery #5403

Merged
merged 9 commits into from
Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions server/api/unsafe_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,29 @@ func (h *unsafeOperationHandler) RemoveFailedStores(w http.ResponseWriter, r *ht
rc := getCluster(r)
var input map[string]interface{}
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
storeSlice, ok := typeutil.JSONToUint64Slice(input["stores"])
if !ok {
h.rd.JSON(w, http.StatusBadRequest, "Store ids are invalid")
return
}

stores := make(map[uint64]struct{})
for _, store := range storeSlice {
stores[store] = struct{}{}
autoDetect, exists := input["auto-detect"].(bool)
if !exists || !autoDetect {
storeSlice, ok := typeutil.JSONToUint64Slice(input["stores"])
if !ok {
h.rd.JSON(w, http.StatusBadRequest, "Store ids are invalid")
return
}
for _, store := range storeSlice {
stores[store] = struct{}{}
}
}

timeout := uint64(600)
rawTimeout, exists := input["timeout"].(float64)
if exists {
if rawTimeout, exists := input["timeout"].(float64); exists {
timeout = uint64(rawTimeout)
}

if err := rc.GetUnsafeRecoveryController().RemoveFailedStores(stores, timeout); err != nil {
if err := rc.GetUnsafeRecoveryController().RemoveFailedStores(stores, timeout, autoDetect); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
Expand Down
22 changes: 19 additions & 3 deletions server/api/unsafe_operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestUnsafeOperationTestSuite(t *testing.T) {
suite.Run(t, new(unsafeOperationTestSuite))
}

func (suite *unsafeOperationTestSuite) SetupSuite() {
func (suite *unsafeOperationTestSuite) SetupTest() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
server.MustWaitLeader(re, []*server.Server{suite.svr})
Expand All @@ -49,14 +49,15 @@ func (suite *unsafeOperationTestSuite) SetupSuite() {
mustPutStore(re, suite.svr, 1, metapb.StoreState_Offline, metapb.NodeState_Removing, nil)
}

func (suite *unsafeOperationTestSuite) TearDownSuite() {
func (suite *unsafeOperationTestSuite) TearDownTest() {
suite.cleanup()
}

func (suite *unsafeOperationTestSuite) TestRemoveFailedStores() {
re := suite.Require()

input := map[string]interface{}{"stores": []uint64{}}
data, _ := json.Marshal(input)
re := suite.Require()
err := tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/remove-failed-stores", data, tu.StatusNotOK(re),
tu.StringEqual(re, "\"[PD:unsaferecovery:ErrUnsafeRecoveryInvalidInput]invalid input no store specified\"\n"))
suite.NoError(err)
Expand All @@ -83,3 +84,18 @@ func (suite *unsafeOperationTestSuite) TestRemoveFailedStores() {
err = tu.ReadGetJSON(re, testDialClient, suite.urlPrefix+"/remove-failed-stores/show", &output)
suite.NoError(err)
}

func (suite *unsafeOperationTestSuite) TestRemoveFailedStoresAutoDetect() {
re := suite.Require()

input := map[string]interface{}{"auto-detect": false}
data, _ := json.Marshal(input)
err := tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/remove-failed-stores", data, tu.StatusNotOK(re),
tu.StringEqual(re, "\"Store ids are invalid\"\n"))
suite.NoError(err)

input = map[string]interface{}{"auto-detect": true}
data, _ = json.Marshal(input)
err = tu.CheckPostJSON(testDialClient, suite.urlPrefix+"/remove-failed-stores", data, tu.StatusOK(re))
suite.NoError(err)
}
75 changes: 47 additions & 28 deletions server/cluster/unsafe_recovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type unsafeRecoveryController struct {
step uint64
failedStores map[uint64]struct{}
timeout time.Time
autoDetect bool

// collected reports from store, if not reported yet, it would be nil
storeReports map[uint64]*pdpb.StoreReport
Expand Down Expand Up @@ -164,30 +165,32 @@ func (u *unsafeRecoveryController) IsRunning() bool {
}

// RemoveFailedStores removes failed stores from the cluster.
func (u *unsafeRecoveryController) RemoveFailedStores(failedStores map[uint64]struct{}, timeout uint64) error {
func (u *unsafeRecoveryController) RemoveFailedStores(failedStores map[uint64]struct{}, timeout uint64, autoDetect bool) error {
if u.IsRunning() {
return errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs()
}
u.Lock()
defer u.Unlock()

if len(failedStores) == 0 {
return errs.ErrUnsafeRecoveryInvalidInput.FastGenByArgs("no store specified")
}
if !autoDetect {
if len(failedStores) == 0 {
return errs.ErrUnsafeRecoveryInvalidInput.FastGenByArgs("no store specified")
}

// validate the stores and mark the store as tombstone forcibly
for failedStore := range failedStores {
store := u.cluster.GetStore(failedStore)
if store == nil {
return errs.ErrUnsafeRecoveryInvalidInput.FastGenByArgs(fmt.Sprintf("store %v doesn't exist", failedStore))
} else if (store.IsPreparing() || store.IsServing()) && !store.IsDisconnected() {
return errs.ErrUnsafeRecoveryInvalidInput.FastGenByArgs(fmt.Sprintf("store %v is up and connected", failedStore))
// validate the stores and mark the store as tombstone forcibly
for failedStore := range failedStores {
store := u.cluster.GetStore(failedStore)
if store == nil {
return errs.ErrUnsafeRecoveryInvalidInput.FastGenByArgs(fmt.Sprintf("store %v doesn't exist", failedStore))
} else if (store.IsPreparing() || store.IsServing()) && !store.IsDisconnected() {
return errs.ErrUnsafeRecoveryInvalidInput.FastGenByArgs(fmt.Sprintf("store %v is up and connected", failedStore))
}
}
}
for failedStore := range failedStores {
err := u.cluster.BuryStore(failedStore, true)
if err != nil && !errors.ErrorEqual(err, errs.ErrStoreNotFound.FastGenByArgs(failedStore)) {
return err
for failedStore := range failedStores {
err := u.cluster.BuryStore(failedStore, true)
if err != nil && !errors.ErrorEqual(err, errs.ErrStoreNotFound.FastGenByArgs(failedStore)) {
return err
}
}
}

Expand All @@ -204,6 +207,7 @@ func (u *unsafeRecoveryController) RemoveFailedStores(failedStores map[uint64]st

u.timeout = time.Now().Add(time.Duration(timeout) * time.Second)
u.failedStores = failedStores
u.autoDetect = autoDetect
u.changeStage(collectReport)
return nil
}
Expand Down Expand Up @@ -384,7 +388,7 @@ func (u *unsafeRecoveryController) dispatchPlan(heartbeat *pdpb.StoreHeartbeatRe

if reported, exist := u.storeReports[storeID]; reported != nil || !exist {
// the plan has been executed, no need to dispatch again
// or no need to displan plan to this store(e.g. Tiflash)
// or no need to dispatch plan to this store(e.g. Tiflash)
return
}

Expand Down Expand Up @@ -445,17 +449,23 @@ func (u *unsafeRecoveryController) changeStage(stage unsafeRecoveryStage) {
switch u.stage {
case idle:
case collectReport:
stores := ""
count := 0
for store := range u.failedStores {
count += 1
stores += fmt.Sprintf("%d", store)
if count != len(u.failedStores) {
stores += ", "
// TODO: clean up existing operators
output.Info = "Unsafe recovery enters collect report stage"
if u.autoDetect {
output.Details = append(output.Details, "auto detect mode with no specified failed stores")
} else {
stores := ""
count := 0
for store := range u.failedStores {
count += 1
stores += fmt.Sprintf("%d", store)
if count != len(u.failedStores) {
stores += ", "
}
}
output.Details = append(output.Details, fmt.Sprintf("failed stores %s", stores))
}
// TODO: clean up existing operators
output.Info = fmt.Sprintf("Unsafe recovery enters collect report stage: failed stores %s", stores)

case tombstoneTiFlashLearner:
output.Info = "Unsafe recovery enters tombstone TiFlash learner stage"
output.Actions = u.getTombstoneTiFlashLearnerDigest()
Expand Down Expand Up @@ -615,13 +625,22 @@ func (u *unsafeRecoveryController) recordAffectedRegion(region *metapb.Region) {
}
}

func (u *unsafeRecoveryController) isFailed(peer *metapb.Peer) bool {
_, isFailed := u.failedStores[peer.StoreId]
_, isLive := u.storeReports[peer.StoreId]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to wait for at least a round of store heartbeat?

Copy link
Member Author

@Connor1996 Connor1996 Aug 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't avoid that for all the corner cases, add caveat in the flag comment

if isFailed || (u.autoDetect && !isLive) {
return true
}
return false
}

func (u *unsafeRecoveryController) canElectLeader(region *metapb.Region, onlyIncoming bool) bool {
hasQuorum := func(voters []*metapb.Peer) bool {
numFailedVoters := 0
numLiveVoters := 0

for _, voter := range voters {
if _, ok := u.failedStores[voter.StoreId]; ok {
if u.isFailed(voter) {
numFailedVoters += 1
} else {
numLiveVoters += 1
Expand Down Expand Up @@ -657,7 +676,7 @@ func (u *unsafeRecoveryController) getFailedPeers(region *metapb.Region) []*meta
if peer.Role == metapb.PeerRole_Learner || peer.Role == metapb.PeerRole_DemotingVoter {
continue
}
if _, ok := u.failedStores[peer.StoreId]; ok {
if u.isFailed(peer) {
failedPeers = append(failedPeers, peer)
}
}
Expand Down
Loading