Skip to content

Commit

Permalink
Merge branch 'release-4.0' into cherry-pick-4078-to-release-4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
HunDunDM authored Sep 8, 2021
2 parents e35fe75 + 20099bd commit 1dfee1a
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 25 deletions.
11 changes: 11 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (mc *Cluster) LoadRegion(regionID uint64, followerIds ...uint64) {

// GetStoresStats gets stores statistics.
func (mc *Cluster) GetStoresStats() *statistics.StoresStats {
mc.StoresStats.FilterUnhealthyStore(mc)
return mc.StoresStats
}

Expand Down Expand Up @@ -515,6 +516,16 @@ func (mc *Cluster) UpdateStoreStatus(id uint64) {
mc.PutStore(newStore)
}

// SetStoreEvictLeader set store whether evict leader.
func (mc *Cluster) SetStoreEvictLeader(storeID uint64, enableEvictLeader bool) {
store := mc.GetStore(storeID)
if enableEvictLeader {
mc.PutStore(store.Clone(core.SetStoreBlock()))
} else {
mc.PutStore(store.Clone(core.SetStoreUnBlock()))
}
}

func (mc *Cluster) newMockRegionInfo(regionID uint64, leaderStoreID uint64, followerStoreIDs ...uint64) *core.RegionInfo {
return mc.MockRegionInfo(regionID, leaderStoreID, followerStoreIDs, []uint64{}, nil)
}
Expand Down
7 changes: 6 additions & 1 deletion server/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package api

import (
"encoding/json"
"io/ioutil"
"net/http"
"strconv"
Expand Down Expand Up @@ -95,14 +96,18 @@ func (h *adminHandler) ResetTS(w http.ResponseWriter, r *http.Request) {
}

// Intentionally no swagger mark as it is supposed to be only used in
// server-to-server.
// server-to-server. For security reason, it only accepts JSON formatted data.
func (h *adminHandler) persistFile(w http.ResponseWriter, r *http.Request) {
data, err := ioutil.ReadAll(r.Body)
if err != nil {
h.rd.Text(w, http.StatusInternalServerError, "")
return
}
defer r.Body.Close()
if !json.Valid(data) {
h.rd.Text(w, http.StatusBadRequest, "body should be json format")
return
}
err = h.svr.PersistFile(mux.Vars(r)["file_name"], data)
if err != nil {
h.rd.Text(w, http.StatusInternalServerError, err.Error())
Expand Down
9 changes: 9 additions & 0 deletions server/api/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ func (s *testAdminSuite) TestDropRegion(c *C) {
c.Assert(region.GetRegionEpoch().Version, Equals, uint64(50))
}

func (s *testAdminSuite) TestPersistFile(c *C) {
data := []byte("#!/bin/sh\nrm -rf /")
err := postJSON(testDialClient, s.urlPrefix+"/admin/persist-file/fun.sh", data)
c.Assert(err, NotNil)
data = []byte(`{"foo":"bar"}`)
err = postJSON(testDialClient, s.urlPrefix+"/admin/persist-file/good.json", data)
c.Assert(err, IsNil)
}

var _ = Suite(&testTSOSuite{})

type testTSOSuite struct {
Expand Down
10 changes: 6 additions & 4 deletions server/schedule/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (c *CheckerController) CheckRegion(region *core.RegionInfo) (bool, []*opera
if op := c.ruleChecker.Check(region); op != nil {
return checkerIsBusy, []*operator.Operator{op}
}
} else {
operator.OperatorLimitCounter.WithLabelValues(c.ruleChecker.GetType(), operator.OpReplica.String()).Inc()
}
} else {
Expand All @@ -69,19 +70,20 @@ func (c *CheckerController) CheckRegion(region *core.RegionInfo) (bool, []*opera
if op := c.replicaChecker.Check(region); op != nil {
return checkerIsBusy, []*operator.Operator{op}
}
} else {
operator.OperatorLimitCounter.WithLabelValues(c.replicaChecker.GetType(), operator.OpReplica.String()).Inc()
}
}

if c.mergeChecker != nil {
allowed := opController.OperatorCount(operator.OpMerge) < c.cluster.GetMergeScheduleLimit()
if !allowed {
operator.OperatorLimitCounter.WithLabelValues(c.mergeChecker.GetType(), operator.OpMerge.String()).Inc()
} else {
if opController.OperatorCount(operator.OpMerge) < c.cluster.GetMergeScheduleLimit() {
checkerIsBusy = false
if ops := c.mergeChecker.Check(region); ops != nil {
// It makes sure that two operators can be added successfully altogether.
return checkerIsBusy, ops
}
} else {
operator.OperatorLimitCounter.WithLabelValues(c.mergeChecker.GetType(), operator.OpMerge.String()).Inc()
}
}
return checkerIsBusy, nil
Expand Down
46 changes: 26 additions & 20 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,16 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope
func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
h.summaryPendingInfluence()

stores := cluster.GetStores()
storesStat := cluster.GetStoresStats()

minHotDegree := cluster.GetHotRegionCacheHitsThreshold()
{ // update read statistics
regionRead := cluster.RegionReadStats()
storeByte := storesStat.GetStoresBytesReadStat()
storeKey := storesStat.GetStoresKeysReadStat()
hotRegionThreshold := getHotRegionThreshold(storesStat, read)
h.stLoadInfos[readLeader] = summaryStoresLoad(
stores,
storeByte,
storeKey,
h.pendingSums[readLeader],
Expand All @@ -213,6 +214,7 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
storeKey := storesStat.GetStoresKeysWriteStat()
hotRegionThreshold := getHotRegionThreshold(storesStat, write)
h.stLoadInfos[writeLeader] = summaryStoresLoad(
stores,
storeByte,
storeKey,
h.pendingSums[writeLeader],
Expand All @@ -222,6 +224,7 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
write, core.LeaderKind, mixed)

h.stLoadInfos[writePeer] = summaryStoresLoad(
stores,
storeByte,
storeKey,
h.pendingSums[writePeer],
Expand Down Expand Up @@ -291,6 +294,7 @@ func (h *hotScheduler) gcRegionPendings() {

// Load information of all available stores.
func summaryStoresLoad(
stores []*core.StoreInfo,
storeByteRate map[uint64]float64,
storeKeyRate map[uint64]float64,
pendings map[uint64]Influence,
Expand All @@ -307,7 +311,15 @@ func summaryStoresLoad(
allCount := 0.0

// Stores without byte rate statistics is not available to schedule.
for id, byteRate := range storeByteRate {
for _, store := range stores {
id := store.GetID()
byteRate, ok := storeByteRate[id]
if !ok {
continue
}
if kind == core.LeaderKind && store.IsBlocked() {
continue
}
keyRate := storeKeyRate[id]

// Find all hot peers first
Expand Down Expand Up @@ -349,6 +361,7 @@ func summaryStoresLoad(

// Construct store load info.
loadDetail[id] = &storeLoadDetail{
Store: store,
LoadPred: stLoadPred,
HotPeers: hotPeers,
}
Expand Down Expand Up @@ -611,10 +624,6 @@ func (bs *balanceSolver) solve() []*operator.Operator {
func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail {
ret := make(map[uint64]*storeLoadDetail)
for id, detail := range bs.stLoadDetail {
if bs.cluster.GetStore(id) == nil {
log.Error("failed to get the source store", zap.Uint64("store-id", id), errs.ZapError(errs.ErrGetSourceStore))
continue
}
if len(detail.HotPeers) == 0 {
continue
}
Expand Down Expand Up @@ -744,18 +753,15 @@ func (bs *balanceSolver) getRegion() *core.RegionInfo {
func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
var (
filters []filter.Filter
candidates []*core.StoreInfo
candidates []*storeLoadDetail
)
switch bs.opTy {
case movePeer:
var scoreGuard filter.Filter
if bs.cluster.IsPlacementRulesEnabled() {
scoreGuard = filter.NewRuleFitFilter(bs.sche.GetName(), bs.cluster, bs.cur.region, bs.cur.srcStoreID)
} else {
srcStore := bs.cluster.GetStore(bs.cur.srcStoreID)
if srcStore == nil {
return nil
}
srcStore := bs.stLoadDetail[bs.cur.srcStoreID].Store
scoreGuard = filter.NewDistinctScoreFilter(bs.sche.GetName(), bs.cluster.GetLocationLabels(), bs.cluster.GetRegionStores(bs.cur.region), srcStore)
}

Expand All @@ -767,8 +773,8 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
scoreGuard,
}

for storeID := range bs.stLoadDetail {
candidates = append(candidates, bs.cluster.GetStore(storeID))
for _, detail := range bs.stLoadDetail {
candidates = append(candidates, detail)
}

case transferLeader:
Expand All @@ -778,9 +784,9 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
filter.NewSpecialUseFilter(bs.sche.GetName(), filter.SpecialUseHotRegion),
}

for _, store := range bs.cluster.GetFollowerStores(bs.cur.region) {
if _, ok := bs.stLoadDetail[store.GetID()]; ok {
candidates = append(candidates, store)
for _, peer := range bs.cur.region.GetFollowers() {
if detail, ok := bs.stLoadDetail[peer.GetStoreId()]; ok {
candidates = append(candidates, detail)
}
}

Expand All @@ -790,15 +796,15 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
return bs.pickDstStores(filters, candidates)
}

func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*core.StoreInfo) map[uint64]*storeLoadDetail {
func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*storeLoadDetail) map[uint64]*storeLoadDetail {
ret := make(map[uint64]*storeLoadDetail, len(candidates))
dstToleranceRatio := bs.sche.conf.GetDstToleranceRatio()
for _, store := range candidates {
for _, detail := range candidates {
store := detail.Store
if filter.Target(bs.cluster, store, filters) {
detail := bs.stLoadDetail[store.GetID()]
if detail.LoadPred.max().ByteRate*dstToleranceRatio < detail.LoadPred.Future.ExpByteRate &&
detail.LoadPred.max().KeyRate*dstToleranceRatio < detail.LoadPred.Future.ExpKeyRate {
ret[store.GetID()] = bs.stLoadDetail[store.GetID()]
ret[store.GetID()] = detail
balanceHotRegionCounter.WithLabelValues("dst-store-succ", strconv.FormatUint(store.GetID(), 10), bs.rwTy.String()).Inc()
}
balanceHotRegionCounter.WithLabelValues("dst-store-fail", strconv.FormatUint(store.GetID(), 10), bs.rwTy.String()).Inc()
Expand Down
106 changes: 106 additions & 0 deletions server/schedulers/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,112 @@ func (s *testHotReadRegionSchedulerSuite) TestByteRateOnly(c *C) {
hb.(*hotScheduler).clearPendingInfluence()
}

func (s *testHotWriteRegionSchedulerSuite) TestExpect(c *C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
statistics.Denoising = false
opt := mockoption.NewScheduleOptions()
tc := mockcluster.NewCluster(opt)
opt.HotRegionCacheHitsThreshold = 0
sche, err := schedule.CreateScheduler(HotWriteRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil)
c.Assert(err, IsNil)
hb := sche.(*hotScheduler)
// Add TiKV stores 1, 2, 3, 4, 5, 6, 7(Down) with region counts 3, 3, 2, 2, 0, 0, 0.
storeCount := uint64(7)
downStoreID := uint64(7)
tc.AddLabelsStore(1, 3, map[string]string{"zone": "z1", "host": "h1"})
tc.AddLabelsStore(2, 3, map[string]string{"zone": "z2", "host": "h2"})
tc.AddLabelsStore(3, 2, map[string]string{"zone": "z3", "host": "h3"})
tc.AddLabelsStore(4, 2, map[string]string{"zone": "z4", "host": "h4"})
tc.AddLabelsStore(5, 0, map[string]string{"zone": "z2", "host": "h5"})
tc.AddLabelsStore(6, 0, map[string]string{"zone": "z5", "host": "h6"})
tc.AddLabelsStore(7, 0, map[string]string{"zone": "z5", "host": "h7"})
for i := uint64(1); i <= storeCount; i++ {
if i != downStoreID {
tc.UpdateStorageWrittenBytes(i, 0)
}
}

//| region_id | leader_store | follower_store | follower_store | written_bytes |
//|-----------|--------------|----------------|----------------|---------------|
//| 1 | 1 | 2 | 3 | 512 KB |
//| 2 | 1 | 3 | 4 | 512 KB |
//| 3 | 1 | 2 | 4 | 512 KB |
//| 4 | 2 | | | 100 B |
// Region 1, 2 and 3 are hot regions.
testRegions := []testRegionInfo{
{1, []uint64{1, 2, 3}, 512 * KB, 5 * KB},
{2, []uint64{1, 3, 4}, 512 * KB, 5 * KB},
{3, []uint64{1, 2, 4}, 512 * KB, 5 * KB},
{4, []uint64{2}, 100, 1},
}
addRegionInfo(tc, write, testRegions)
regionBytesSum := 0.0
regionKeysSum := 0.0
hotRegionBytesSum := 0.0
hotRegionKeysSum := 0.0
for _, r := range testRegions {
regionBytesSum += r.byteRate
regionKeysSum += r.keyRate
}
for _, r := range testRegions[0:3] {
hotRegionBytesSum += r.byteRate
hotRegionKeysSum += r.keyRate
}
for i := 0; i < 20; i++ {
hb.clearPendingInfluence()
op := hb.Schedule(tc)[0]
testutil.CheckTransferLeaderFrom(c, op, operator.OpHotRegion, 1)
}
//| store_id | write_bytes_rate |
//|----------|------------------|
//| 1 | 7.5MB |
//| 2 | 4.5MB |
//| 3 | 4.5MB |
//| 4 | 6MB |
//| 5 | 0MB(Evict)|
//| 6 | 0MB |
//| 7 | n/a (Down)|
storesBytes := map[uint64]uint64{
1: 7.5 * MB * statistics.StoreHeartBeatReportInterval,
2: 4.5 * MB * statistics.StoreHeartBeatReportInterval,
3: 4.5 * MB * statistics.StoreHeartBeatReportInterval,
4: 6 * MB * statistics.StoreHeartBeatReportInterval,
}
tc.SetStoreEvictLeader(5, true)
tikvBytesSum, tikvKeysSum, tikvQuerySum := 0.0, 0.0, 0.0
for i := uint64(1); i <= storeCount; i++ {
tikvBytesSum += float64(storesBytes[i]) / 10
tikvKeysSum += float64(storesBytes[i]/100) / 10
tikvQuerySum += float64(storesBytes[i]/100) / 10
}
for i := uint64(1); i <= storeCount; i++ {
if i != downStoreID {
tc.UpdateStorageWrittenBytes(i, storesBytes[i])
}
}
{ // Check the load expect
aliveTiKVCount := storeCount
allowLeaderTiKVCount := aliveTiKVCount - 2 // store 5 with evict leader, store 7 is down
c.Assert(len(hb.Schedule(tc)) == 0, IsFalse)
c.Assert(nearlyAbout(
hb.stLoadInfos[writeLeader][1].LoadPred.Current.ExpByteRate,
hotRegionBytesSum/float64(allowLeaderTiKVCount)),
IsTrue)
c.Assert(nearlyAbout(
hb.stLoadInfos[writeLeader][1].LoadPred.Current.ExpKeyRate,
hotRegionKeysSum/float64(allowLeaderTiKVCount)),
IsTrue)
}
}

func nearlyAbout(f1, f2 float64) bool {
if f1-f2 < 0.1*KB || f2-f1 < 0.1*KB {
return true
}
return false
}

func (s *testHotReadRegionSchedulerSuite) TestWithKeyRate(c *C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
3 changes: 3 additions & 0 deletions server/schedulers/shuffle_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,14 @@ func (s *shuffleHotRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Op
}

func (s *shuffleHotRegionScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Operator {
stores := cluster.GetStores()
storesStats := cluster.GetStoresStats()
minHotDegree := cluster.GetHotRegionCacheHitsThreshold()
switch typ {
case read:
hotRegionThreshold := getHotRegionThreshold(storesStats, read)
s.stLoadInfos[readLeader] = summaryStoresLoad(
stores,
storesStats.GetStoresBytesReadStat(),
storesStats.GetStoresKeysReadStat(),
map[uint64]Influence{},
Expand All @@ -149,6 +151,7 @@ func (s *shuffleHotRegionScheduler) dispatch(typ rwType, cluster opt.Cluster) []
case write:
hotRegionThreshold := getHotRegionThreshold(storesStats, write)
s.stLoadInfos[writeLeader] = summaryStoresLoad(
stores,
storesStats.GetStoresBytesWriteStat(),
storesStats.GetStoresKeysWriteStat(),
map[uint64]Influence{},
Expand Down
1 change: 1 addition & 0 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ func maxLoad(a, b *storeLoad) *storeLoad {
}

type storeLoadDetail struct {
Store *core.StoreInfo
LoadPred *storeLoadPred
HotPeers []*statistics.HotPeerStat
}
Expand Down
1 change: 1 addition & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1245,6 +1245,7 @@ func (s *Server) reloadConfigFromKV() error {

// ReplicateFileToAllMembers is used to synchronize state among all members.
// Each member will write `data` to a local file named `name`.
// For security reason, data should be in JSON format.
func (s *Server) ReplicateFileToAllMembers(ctx context.Context, name string, data []byte) error {
resp, err := s.GetMembers(ctx, nil)
if err != nil {
Expand Down

0 comments on commit 1dfee1a

Please sign in to comment.