Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

region: Add accelerate regions schedule API #2599

Merged
merged 2 commits into from
Jul 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions server/api/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package api

import (
"container/heap"
"encoding/hex"
"fmt"
"net/http"
"net/url"
"sort"
Expand All @@ -24,6 +26,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/replication_modepb"
"github.com/pingcap/pd/v4/pkg/apiutil"
"github.com/pingcap/pd/v4/server"
"github.com/pingcap/pd/v4/server/core"
"github.com/unrolled/render"
Expand Down Expand Up @@ -557,6 +560,73 @@ func (h *regionsHandler) GetTopSize(w http.ResponseWriter, r *http.Request) {
})
}

// @Tags region
// @Summary Accelerate regions scheduling a in given range, only receive hex format for keys
// @Accept json
// @Param body body object true "json params"
// @Param limit query integer false "Limit count" default(256)
// @Produce json
// @Success 200 {string} string "Accelerate regions scheduling in a given range[startKey,endKey)"
// @Failure 400 {string} string "The input is invalid."
// @Router /regions/accelerate-schedule [post]
func (h *regionsHandler) AccelerateRegionsScheduleInRange(w http.ResponseWriter, r *http.Request) {
rc := getCluster(r.Context())
var input map[string]interface{}
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil {
return
}
parseKey := func(name string, input map[string]interface{}) (string, string, error) {
k, ok := input[name]
if !ok {
return "", "", fmt.Errorf("missing %s", name)
}
rawKey, ok := k.(string)
if !ok {
return "", "", fmt.Errorf("bad format %s", name)
}
returned, err := hex.DecodeString(rawKey)
if err != nil {
return "", "", fmt.Errorf("split key %s is not in hex format", name)
}
return string(returned), rawKey, nil
}

startKey, rawStartKey, err := parseKey("start_key", input)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}

endKey, rawEndKey, err := parseKey("end_key", input)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}

limit := 256
if limitStr := r.URL.Query().Get("limit"); limitStr != "" {
var err error
limit, err = strconv.Atoi(limitStr)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
}
if limit > maxRegionLimit {
limit = maxRegionLimit
}

regions := rc.ScanRegions([]byte(startKey), []byte(endKey), limit)
if len(regions) > 0 {
regionsIDList := make([]uint64, 0, len(regions))
for _, region := range regions {
regionsIDList = append(regionsIDList, region.GetID())
}
rc.AddSuspectRegions(regionsIDList...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if repeatedly adding the same regions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The duplicated operators would be discarded due to checkAddOperator (due to same priority) in AddWaitingOperator if the operators added in the first time haven't been successes yet. Is that true? @disksing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think true.

}
h.rd.Text(w, http.StatusOK, fmt.Sprintf("Accelerate regions scheduling in a given range [%s,%s)", rawStartKey, rawEndKey))
}

func (h *regionsHandler) GetTopNRegions(w http.ResponseWriter, r *http.Request, less func(a, b *core.RegionInfo) bool) {
rc := getCluster(r.Context())
limit := defaultRegionLimit
Expand Down
16 changes: 16 additions & 0 deletions server/api/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package api

import (
"bytes"
"encoding/hex"
"fmt"
"math/rand"
"net/url"
Expand Down Expand Up @@ -258,6 +259,21 @@ func (s *testRegionSuite) TestTopSize(c *C) {
s.checkTopRegions(c, fmt.Sprintf("%s/regions/size?limit=%d", s.urlPrefix, 2), []uint64{7, 8})
}

func (s *testRegionSuite) TestAccelerateRegionsScheduleInRange(c *C) {
r1 := newTestRegionInfo(557, 13, []byte("a1"), []byte("a2"))
r2 := newTestRegionInfo(558, 14, []byte("a2"), []byte("a3"))
r3 := newTestRegionInfo(559, 15, []byte("a3"), []byte("a4"))
mustRegionHeartbeat(c, s.svr, r1)
mustRegionHeartbeat(c, s.svr, r2)
mustRegionHeartbeat(c, s.svr, r3)
body := fmt.Sprintf(`{"start_key":"%s", "end_key": "%s"}`, hex.EncodeToString([]byte("a1")), hex.EncodeToString([]byte("a3")))

err := postJSON(testDialClient, fmt.Sprintf("%s/regions/accelerate-schedule", s.urlPrefix), []byte(body))
c.Assert(err, IsNil)
rleungx marked this conversation as resolved.
Show resolved Hide resolved
idList := s.svr.GetRaftCluster().GetSuspectRegions()
c.Assert(len(idList), Equals, 2)
}

func (s *testRegionSuite) checkTopRegions(c *C, url string, regionIDs []uint64) {
regions := &RegionsInfo{}
err := readJSON(testDialClient, url, regions)
Expand Down
1 change: 1 addition & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func createRouter(ctx context.Context, prefix string, svr *server.Server) *mux.R
clusterRouter.HandleFunc("/regions/check/hist-size", regionsHandler.GetSizeHistogram).Methods("GET")
clusterRouter.HandleFunc("/regions/check/hist-keys", regionsHandler.GetKeysHistogram).Methods("GET")
clusterRouter.HandleFunc("/regions/sibling/{id}", regionsHandler.GetRegionSiblings).Methods("GET")
clusterRouter.HandleFunc("/regions/accelerate-schedule", regionsHandler.AccelerateRegionsScheduleInRange).Methods("POST")

apiRouter.Handle("/version", newVersionHandler(rd)).Methods("GET")
apiRouter.Handle("/status", newStatusHandler(svr, rd)).Methods("GET")
Expand Down