Skip to content

Commit 3a01315

Browse files
committed
Merge branch 'feature/add-region-max-size' of github.com:bufferflies/pd into feature/add-region-max-size
2 parents de13935 + dc2c9f7 commit 3a01315

File tree

18 files changed

+207
-54
lines changed

18 files changed

+207
-54
lines changed

client/Makefile

+2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ GO_TOOLS_BIN_PATH := $(shell pwd)/../.tools/bin
1616
PATH := $(GO_TOOLS_BIN_PATH):$(PATH)
1717
SHELL := env PATH='$(PATH)' GOBIN='$(GO_TOOLS_BIN_PATH)' $(shell which bash)
1818

19+
default: static tidy test
20+
1921
test:
2022
CGO_ENABLE=1 go test -race -cover
2123

pd.code-workspace

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
{
2+
"folders": [
3+
{
4+
"path": "."
5+
},
6+
{
7+
"path": "client"
8+
}
9+
],
10+
"settings": {}
11+
}

pkg/mock/mockcluster/mockcluster.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func (mc *Cluster) GetOpts() *config.PersistOptions {
8080
return mc.PersistOptions
8181
}
8282

83-
// GetConfig returns the cluster immutable configuration.
83+
// GetImmutableCfg returns the cluster immutable configuration.
8484
func (mc *Cluster) GetImmutableCfg() *config.ImmutableConfig {
8585
return mc.ImmutableConfig
8686
}

server/api/operator.go

+27
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package api
1717
import (
1818
"net/http"
1919
"strconv"
20+
"time"
2021

2122
"github.com/gorilla/mux"
2223
"github.com/tikv/pd/pkg/apiutil"
@@ -344,6 +345,32 @@ func (h *operatorHandler) Delete(w http.ResponseWriter, r *http.Request) {
344345
h.r.JSON(w, http.StatusOK, "The pending operator is canceled.")
345346
}
346347

348+
// @Tags operator
349+
// @Summary lists the finished operators since the given timestamp in second.
350+
// @Param from query integer false "From Unix timestamp"
351+
// @Produce json
352+
// @Success 200 {object} []operator.OpRecord
353+
// @Failure 400 {string} string "The request is invalid."
354+
// @Failure 500 {string} string "PD server failed to proceed the request."
355+
// @Router /operators/records [get]
356+
func (h *operatorHandler) Records(w http.ResponseWriter, r *http.Request) {
357+
var from time.Time
358+
if fromStr := r.URL.Query()["from"]; len(fromStr) > 0 {
359+
fromInt, err := strconv.ParseInt(fromStr[0], 10, 64)
360+
if err != nil {
361+
h.r.JSON(w, http.StatusBadRequest, err.Error())
362+
return
363+
}
364+
from = time.Unix(fromInt, 0)
365+
}
366+
records, err := h.GetRecords(from)
367+
if err != nil {
368+
h.r.JSON(w, http.StatusInternalServerError, err.Error())
369+
return
370+
}
371+
h.r.JSON(w, http.StatusOK, records)
372+
}
373+
347374
func parseStoreIDsAndPeerRole(ids interface{}, roles interface{}) (map[uint64]placement.PeerRoleType, bool) {
348375
items, ok := ids.([]interface{})
349376
if !ok {

server/api/operator_test.go

+14
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ import (
1919
"errors"
2020
"fmt"
2121
"io"
22+
"strconv"
2223
"strings"
24+
"time"
2325

2426
. "github.com/pingcap/check"
2527
"github.com/pingcap/failpoint"
@@ -79,6 +81,9 @@ func (s *testOperatorSuite) TestAddRemovePeer(c *C) {
7981
regionURL := fmt.Sprintf("%s/operators/%d", s.urlPrefix, region.GetId())
8082
operator := mustReadURL(c, regionURL)
8183
c.Assert(strings.Contains(operator, "operator not found"), IsTrue)
84+
recordURL := fmt.Sprintf("%s/operators/records?from=%s", s.urlPrefix, strconv.FormatInt(time.Now().Unix(), 10))
85+
records := mustReadURL(c, recordURL)
86+
c.Assert(strings.Contains(records, "operator not found"), IsTrue)
8287

8388
mustPutStore(c, s.svr, 3, metapb.StoreState_Up, nil)
8489
err := postJSON(testDialClient, fmt.Sprintf("%s/operators", s.urlPrefix), []byte(`{"name":"add-peer", "region_id": 1, "store_id": 3}`))
@@ -89,6 +94,8 @@ func (s *testOperatorSuite) TestAddRemovePeer(c *C) {
8994

9095
_, err = doDelete(testDialClient, regionURL)
9196
c.Assert(err, IsNil)
97+
records = mustReadURL(c, recordURL)
98+
c.Assert(strings.Contains(records, "admin-add-peer {add peer: store [3]}"), IsTrue)
9299

93100
err = postJSON(testDialClient, fmt.Sprintf("%s/operators", s.urlPrefix), []byte(`{"name":"remove-peer", "region_id": 1, "store_id": 2}`))
94101
c.Assert(err, IsNil)
@@ -98,6 +105,8 @@ func (s *testOperatorSuite) TestAddRemovePeer(c *C) {
98105

99106
_, err = doDelete(testDialClient, regionURL)
100107
c.Assert(err, IsNil)
108+
records = mustReadURL(c, recordURL)
109+
c.Assert(strings.Contains(records, "admin-remove-peer {rm peer: store [2]}"), IsTrue)
101110

102111
mustPutStore(c, s.svr, 4, metapb.StoreState_Up, nil)
103112
err = postJSON(testDialClient, fmt.Sprintf("%s/operators", s.urlPrefix), []byte(`{"name":"add-learner", "region_id": 1, "store_id": 4}`))
@@ -114,6 +123,11 @@ func (s *testOperatorSuite) TestAddRemovePeer(c *C) {
114123
c.Assert(err, NotNil)
115124
err = postJSON(testDialClient, fmt.Sprintf("%s/operators", s.urlPrefix), []byte(`{"name":"transfer-region", "region_id": 1, "to_store_ids": [1, 2, 3]}`))
116125
c.Assert(err, NotNil)
126+
127+
// Fail to get operator if from is latest.
128+
time.Sleep(time.Second)
129+
records = mustReadURL(c, fmt.Sprintf("%s/operators/records?from=%s", s.urlPrefix, strconv.FormatInt(time.Now().Unix(), 10)))
130+
c.Assert(strings.Contains(records, "operator not found"), IsTrue)
117131
}
118132

119133
func (s *testOperatorSuite) TestMergeRegionOperator(c *C) {

server/api/router.go

+1
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
150150
operatorHandler := newOperatorHandler(handler, rd)
151151
registerFunc(apiRouter, "GetOperators", "/operators", operatorHandler.List, setMethods("GET"))
152152
registerFunc(apiRouter, "SetOperators", "/operators", operatorHandler.Post, setMethods("POST"))
153+
registerFunc(apiRouter, "GetOperatorRecords", "/operators/records", operatorHandler.Records, setMethods("GET"))
153154
registerFunc(apiRouter, "GetRegionOperator", "/operators/{region_id}", operatorHandler.Get, setMethods("GET"))
154155
registerFunc(apiRouter, "DeleteRegionOperator", "/operators/{region_id}", operatorHandler.Delete, setMethods("DELETE"))
155156

server/cluster/cluster.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,6 @@ func (c *RaftCluster) runBackgroundJobs(interval time.Duration) {
330330
case <-ticker.C:
331331
c.checkStores()
332332
c.collectMetrics()
333-
c.coordinator.opController.PruneHistory()
334333
}
335334
}
336335
}
@@ -492,7 +491,7 @@ func (c *RaftCluster) GetOpts() *config.PersistOptions {
492491
return c.opt
493492
}
494493

495-
// GetConfig gets the cluster configuration.
494+
// GetImmutableCfg gets the cluster Immutable configuration.
496495
func (c *RaftCluster) GetImmutableCfg() *config.ImmutableConfig {
497496
return c.immutableCfg
498497
}

server/config/config.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -600,7 +600,7 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
600600
c.Security.Encryption.Adjust()
601601

602602
adjustUint64(&c.MaxRegionSize, defaultMaxRegionSize)
603-
adjustUint64(&c.MaxRegionSize, defaultMaxSplitSize)
603+
adjustUint64(&c.MaxSplitSize, defaultMaxSplitSize)
604604

605605
return nil
606606
}

server/config/config_test.go

-1
Original file line numberDiff line numberDiff line change
@@ -524,5 +524,4 @@ func (s *testConfigSuite) TestImmutableConfig(c *C) {
524524
iconfig = NewImmutableConfig(config)
525525
c.Assert(iconfig.GetMaxRegionSize(), Equals, uint64(200))
526526
c.Assert(iconfig.GetMaxRegionSize(), Equals, uint64(300))
527-
528527
}

server/grpc_service.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -1410,8 +1410,8 @@ func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorR
14101410
return &pdpb.GetOperatorResponse{
14111411
Header: s.header(),
14121412
RegionId: requestID,
1413-
Desc: []byte(r.Op.Desc()),
1414-
Kind: []byte(r.Op.Kind().String()),
1413+
Desc: []byte(r.Desc()),
1414+
Kind: []byte(r.Kind().String()),
14151415
Status: r.Status,
14161416
}, nil
14171417
}

server/handler.go

+13
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,19 @@ func (h *Handler) GetHistory(start time.Time) ([]operator.OpHistory, error) {
457457
return c.GetHistory(start), nil
458458
}
459459

460+
// GetRecords returns finished operators since start.
461+
func (h *Handler) GetRecords(from time.Time) ([]*operator.OpRecord, error) {
462+
c, err := h.GetOperatorController()
463+
if err != nil {
464+
return nil, err
465+
}
466+
records := c.GetRecords(from)
467+
if len(records) == 0 {
468+
return nil, ErrOperatorNotFound
469+
}
470+
return records, nil
471+
}
472+
460473
// SetAllStoresLimit is used to set limit of all stores.
461474
func (h *Handler) SetAllStoresLimit(ratePerMin float64, limitType storelimit.Type) error {
462475
c, err := h.GetRaftCluster()

server/schedule/checker/merge_checker_test.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,14 @@ func (s *testMergeCheckerSuite) TestBasic(c *C) {
114114
c.Assert(ops[1].RegionID(), Equals, s.regions[1].GetID())
115115

116116
// change the max region size
117-
s.mc.config = config.NewImmutableConfig(config.NewConfig(), config.WithMaxRegionSize(200))
117+
cfg := config.NewConfig()
118+
cfg.MaxRegionSize = 200
119+
s.mc.config = config.NewImmutableConfig(cfg)
118120
s.cluster.PutRegion(s.regions[1].Clone(core.SetApproximateSize(200)))
119121
ops = s.mc.Check(s.regions[2])
120122
c.Assert(ops, NotNil)
121-
s.mc.config = config.NewImmutableConfig(config.NewConfig(), config.WithMaxRegionSize(96))
123+
cfg.MaxRegionSize = 96
124+
s.mc.config = config.NewImmutableConfig(cfg)
122125
// Test the peer store check.
123126
store := s.cluster.GetStore(1)
124127
c.Assert(store, NotNil)

server/schedule/operator/operator.go

+31
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,37 @@ func (o *Operator) History() []OpHistory {
357357
return histories
358358
}
359359

360+
// OpRecord is used to log and visualize completed operators.
361+
type OpRecord struct {
362+
*Operator
363+
FinishTime time.Time
364+
duration time.Duration
365+
}
366+
367+
func (o *OpRecord) String() string {
368+
return fmt.Sprintf("%s (finishAt:%v, duration:%v)", o.Operator.String(), o.FinishTime, o.duration)
369+
}
370+
371+
// MarshalJSON returns the status of operator as a JSON string
372+
func (o *OpRecord) MarshalJSON() ([]byte, error) {
373+
return []byte(`"` + o.String() + `"`), nil
374+
}
375+
376+
// Record transfers the operator to OpRecord.
377+
func (o *Operator) Record(finishTime time.Time) *OpRecord {
378+
step := atomic.LoadInt32(&o.currentStep)
379+
record := &OpRecord{
380+
Operator: o,
381+
FinishTime: finishTime,
382+
}
383+
start := o.GetStartTime()
384+
if o.Status() != SUCCESS && 0 < step && int(step-1) < len(o.stepsTime) {
385+
start = time.Unix(0, o.stepsTime[int(step-1)])
386+
}
387+
record.duration = finishTime.Sub(start)
388+
return record
389+
}
390+
360391
// GetAdditionalInfo returns additional info with string
361392
func (o *Operator) GetAdditionalInfo() string {
362393
if len(o.AdditionalInfos) != 0 {

server/schedule/operator/operator_test.go

+9
Original file line numberDiff line numberDiff line change
@@ -418,3 +418,12 @@ func (s *testOperatorSuite) TestSchedulerKind(c *C) {
418418
c.Assert(v.op.SchedulerKind(), Equals, v.expect)
419419
}
420420
}
421+
422+
func (s *testOperatorSuite) TestRecord(c *C) {
423+
operator := s.newTestOperator(1, OpLeader, AddLearner{ToStore: 1, PeerID: 1}, RemovePeer{FromStore: 1, PeerID: 1})
424+
now := time.Now()
425+
time.Sleep(time.Second)
426+
ob := operator.Record(now)
427+
c.Assert(ob.FinishTime, Equals, now)
428+
c.Assert(ob.duration.Seconds(), Greater, time.Second.Seconds())
429+
}

0 commit comments

Comments
 (0)