Skip to content

Commit

Permalink
Merge branch 'master' into balancer-set
Browse files Browse the repository at this point in the history
  • Loading branch information
lhy1024 authored Apr 24, 2023
2 parents f027e50 + 08b919a commit cdf0106
Show file tree
Hide file tree
Showing 22 changed files with 605 additions and 152 deletions.
3 changes: 2 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,9 +424,10 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
oldTSOSvcDiscovery.Close()
}
}
oldMode := c.serviceMode
c.serviceMode = newMode
log.Info("[pd] service mode changed",
zap.String("old-mode", c.serviceMode.String()),
zap.String("old-mode", oldMode.String()),
zap.String("new-mode", newMode.String()))
}

Expand Down
14 changes: 8 additions & 6 deletions pkg/core/storelimit/limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"container/list"
"context"
"math/rand"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -125,21 +126,22 @@ func TestFeedback(t *testing.T) {
}
// region size is 10GB, snapshot write limit is 100MB/s and the snapshot concurrency is 3.
// the best strategy is that the tikv executing queue equals the wait.
regionSize, limit, wait := int64(10000), int64(100), int64(4)
iter := 100
const regionSize, limit, wait = int64(10000), int64(100), int64(4)
var iter atomic.Int32
iter.Store(100)
ops := make(chan int64, 10)
ctx, cancel := context.WithCancel(context.Background())

// generate the operator
go func() {
for {
if s.Available(regionSize, SendSnapshot, constant.Low) && iter > 0 {
iter--
if s.Available(regionSize, SendSnapshot, constant.Low) && iter.Load() > 0 {
iter.Add(-1)
size := regionSize - rand.Int63n(regionSize/10)
s.Take(size, SendSnapshot, constant.Low)
ops <- size
}
if iter == 0 {
if iter.Load() == 0 {
cancel()
return
}
Expand Down Expand Up @@ -185,7 +187,7 @@ func TestFeedback(t *testing.T) {
err := exec*wait - cost
queue.Remove(first)
s.Feedback(float64(err))
if iter < 5 {
if iter.Load() < 5 {
re.Greater(float64(s.GetCap()), float64(regionSize*(wait-2)))
re.Less(float64(s.GetCap()), float64(regionSize*wait))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ func (s *Server) startServer() (err error) {
tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID)
s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr}
s.keyspaceGroupManager = tso.NewKeyspaceGroupManager(
s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.listenURL.Host, legacySvcRootPath, tsoSvcRootPath, s.cfg)
s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.cfg.AdvertiseListenAddr, legacySvcRootPath, tsoSvcRootPath, s.cfg)
if err := s.keyspaceGroupManager.Initialize(); err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ func (mc *Cluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat {
}

// BucketsStats returns hot region's buckets stats.
func (mc *Cluster) BucketsStats(degree int) map[uint64][]*buckets.BucketStat {
task := buckets.NewCollectBucketStatsTask(degree)
func (mc *Cluster) BucketsStats(degree int, regions ...uint64) map[uint64][]*buckets.BucketStat {
task := buckets.NewCollectBucketStatsTask(degree, regions...)
if !mc.HotBucketCache.CheckAsync(task) {
return nil
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ func (o *Operator) String() string {
return s
}

// Brief returns the operator's short brief.
func (o *Operator) Brief() string {
return o.brief
}

// MarshalJSON serializes custom types to JSON.
func (o *Operator) MarshalJSON() ([]byte, error) {
return []byte(`"` + o.String() + `"`), nil
Expand Down
130 changes: 114 additions & 16 deletions pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package schedulers

import (
"bytes"
"fmt"

"math"
"math/rand"
"net/http"
Expand All @@ -24,6 +26,7 @@ import (
"time"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/pkg/core"
Expand All @@ -35,24 +38,30 @@ import (
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/utils/keyutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
)

var (
statisticsInterval = time.Second
// WithLabelValues is a heavy operation, define variable to avoid call it every time.
hotSchedulerCounter = schedulerCounter.WithLabelValues(HotRegionName, "schedule")
hotSchedulerSkipCounter = schedulerCounter.WithLabelValues(HotRegionName, "skip")
hotSchedulerCounter = schedulerCounter.WithLabelValues(HotRegionName, "schedule")
hotSchedulerSkipCounter = schedulerCounter.WithLabelValues(HotRegionName, "skip")
hotSchedulerSearchRevertRegionsCounter = schedulerCounter.WithLabelValues(HotRegionName, "search_revert_regions")
hotSchedulerNotSameEngineCounter = schedulerCounter.WithLabelValues(HotRegionName, "not_same_engine")
hotSchedulerNoRegionCounter = schedulerCounter.WithLabelValues(HotRegionName, "no_region")
hotSchedulerUnhealthyReplicaCounter = schedulerCounter.WithLabelValues(HotRegionName, "unhealthy_replica")
hotSchedulerAbnormalReplicaCounter = schedulerCounter.WithLabelValues(HotRegionName, "abnormal_replica")
hotSchedulerCreateOperatorFailedCounter = schedulerCounter.WithLabelValues(HotRegionName, "create_operator_failed")
hotSchedulerNewOperatorCounter = schedulerCounter.WithLabelValues(HotRegionName, "new_operator")
hotSchedulerSnapshotSenderLimitCounter = schedulerCounter.WithLabelValues(HotRegionName, "snapshot_sender_limit")

// counter related with the split region
hotSchedulerNotFoundSplitKeysCounter = schedulerCounter.WithLabelValues(HotRegionName, "not_found_split_keys")
hotSchedulerRegionBucketsNotHotCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_buckets_not_hot")
hotSchedulerSplitSuccessCounter = schedulerCounter.WithLabelValues(HotRegionName, "split_success")
hotSchedulerNeedSplitBeforeScheduleCounter = schedulerCounter.WithLabelValues(HotRegionName, "need_split_before_move_peer")
hotSchedulerSearchRevertRegionsCounter = schedulerCounter.WithLabelValues(HotRegionName, "search_revert_regions")
hotSchedulerNotSameEngineCounter = schedulerCounter.WithLabelValues(HotRegionName, "not_same_engine")
hotSchedulerNoRegionCounter = schedulerCounter.WithLabelValues(HotRegionName, "no_region")
hotSchedulerUnhealthyReplicaCounter = schedulerCounter.WithLabelValues(HotRegionName, "unhealthy_replica")
hotSchedulerAbnormalReplicaCounter = schedulerCounter.WithLabelValues(HotRegionName, "abnormal_replica")
hotSchedulerCreateOperatorFailedCounter = schedulerCounter.WithLabelValues(HotRegionName, "create_operator_failed")
hotSchedulerNewOperatorCounter = schedulerCounter.WithLabelValues(HotRegionName, "new_operator")
hotSchedulerSnapshotSenderLimit = schedulerCounter.WithLabelValues(HotRegionName, "snapshot_sender_limit")

hotSchedulerMoveLeaderCounter = schedulerCounter.WithLabelValues(HotRegionName, moveLeader.String())
hotSchedulerMovePeerCounter = schedulerCounter.WithLabelValues(HotRegionName, movePeer.String())
Expand Down Expand Up @@ -196,6 +205,8 @@ var (
// If the distribution of a dimension is below the corresponding stddev threshold, then scheduling will no longer be based on this dimension,
// as it implies that this dimension is sufficiently uniform.
stddevThreshold = 0.1

splitBucket = "split-hot-region"
)

type hotScheduler struct {
Expand Down Expand Up @@ -634,11 +645,8 @@ func (bs *balanceSolver) solve() []*operator.Operator {
if bs.cur.region = bs.getRegion(mainPeerStat, srcStoreID); bs.cur.region == nil {
continue
} else if bs.opTy == movePeer {
if bs.cur.region.GetApproximateSize() > bs.GetOpts().GetMaxMovableHotPeerSize() {
hotSchedulerNeedSplitBeforeScheduleCounter.Inc()
continue
} else if !snapshotFilter.Select(bs.cur.region).IsOK() {
hotSchedulerSnapshotSenderLimit.Inc()
if !snapshotFilter.Select(bs.cur.region).IsOK() {
hotSchedulerSnapshotSenderLimitCounter.Inc()
continue
}
}
Expand Down Expand Up @@ -717,7 +725,7 @@ func (bs *balanceSolver) tryAddPendingInfluence() bool {
return false
}
// revert peers
if bs.best.revertPeerStat != nil {
if bs.best.revertPeerStat != nil && len(bs.ops) > 1 {
infl := bs.collectPendingInfluence(bs.best.revertPeerStat)
if !bs.sche.tryAddPendingInfluence(bs.ops[1], dstStoreID, srcStoreID, infl, maxZombieDur) {
return false
Expand Down Expand Up @@ -1341,6 +1349,22 @@ func (bs *balanceSolver) buildOperators() (ops []*operator.Operator) {
return nil
}

splitRegions := make([]*core.RegionInfo, 0)
if bs.opTy == movePeer {
for _, region := range []*core.RegionInfo{bs.cur.region, bs.cur.revertRegion} {
if region == nil {
continue
}
if region.GetApproximateSize() > bs.GetOpts().GetMaxMovableHotPeerSize() {
hotSchedulerNeedSplitBeforeScheduleCounter.Inc()
splitRegions = append(splitRegions, region)
}
}
}
if len(splitRegions) > 0 {
return bs.createSplitOperator(splitRegions)
}

srcStoreID := bs.cur.srcStore.GetID()
dstStoreID := bs.cur.dstStore.GetID()
sourceLabel := strconv.FormatUint(srcStoreID, 10)
Expand Down Expand Up @@ -1377,6 +1401,64 @@ func (bs *balanceSolver) buildOperators() (ops []*operator.Operator) {
return
}

// createSplitOperator creates split operators for the given regions.
func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo) []*operator.Operator {
if len(regions) == 0 {
return nil
}
ids := make([]uint64, len(regions))
for i, region := range regions {
ids[i] = region.GetID()
}
hotBuckets := bs.Cluster.BucketsStats(bs.minHotDegree, ids...)
operators := make([]*operator.Operator, 0)

createFunc := func(region *core.RegionInfo) {
stats, ok := hotBuckets[region.GetID()]
if !ok {
hotSchedulerRegionBucketsNotHotCounter.Inc()
return
}
startKey, endKey := region.GetStartKey(), region.GetEndKey()
splitKey := make([][]byte, 0)
for _, stat := range stats {
if keyutil.Between(startKey, endKey, stat.StartKey) && keyutil.Between(startKey, endKey, stat.EndKey) {
if len(splitKey) == 0 {
splitKey = append(splitKey, stat.StartKey, stat.EndKey)
continue
}
// If the last split key is equal to the current start key, we can merge them.
// E.g. [a, b), [b, c) -> [a, c) split keys is [a,c]
// Otherwise, we should append the current start key and end key.
// E.g. [a, b), [c, d) -> [a, b), [c, d) split keys is [a,b,c,d]
if bytes.Equal(stat.StartKey, splitKey[len(splitKey)-1]) {
splitKey[len(splitKey)-1] = stat.EndKey
} else {
splitKey = append(splitKey, stat.StartKey, stat.EndKey)
}
}
}
if len(splitKey) == 0 {
hotSchedulerNotFoundSplitKeysCounter.Inc()
return
}
op, err := operator.CreateSplitRegionOperator(splitBucket, region, operator.OpSplit, pdpb.CheckPolicy_USEKEY, splitKey)
if err != nil {
log.Error("fail to create split operator",
zap.Stringer("resource-type", bs.resourceTy),
errs.ZapError(err))
return
}
hotSchedulerSplitSuccessCounter.Inc()
operators = append(operators, op)
}

for _, region := range regions {
createFunc(region)
}
return operators
}

func (bs *balanceSolver) createReadOperator(region *core.RegionInfo, srcStoreID, dstStoreID uint64) (op *operator.Operator, typ string, err error) {
if region.GetStorePeer(dstStoreID) != nil {
typ = "transfer-leader"
Expand Down Expand Up @@ -1550,6 +1632,22 @@ const (
resourceTypeLen
)

// String implements fmt.Stringer interface.
func (ty resourceType) String() string {
switch ty {
case writePeer:
return "write-peer"
case writeLeader:
return "write-leader"
case readPeer:
return "read-peer"
case readLeader:
return "read-leader"
default:
return ""
}
}

func toResourceType(rwTy statistics.RWType, opTy opType) resourceType {
switch rwTy {
case statistics.Write:
Expand Down
43 changes: 43 additions & 0 deletions pkg/schedule/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import (

"github.com/docker/go-units"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/statistics/buckets"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/operatorutil"
Expand Down Expand Up @@ -200,6 +202,47 @@ func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) {
checkHotWriteRegionScheduleByteRateOnly(re, true /* enable placement rules */)
}

func TestSplitBuckets(t *testing.T) {
re := require.New(t)
statistics.Denoising = false
cancel, _, tc, oc := prepareSchedulersTest()
tc.SetHotRegionCacheHitsThreshold(1)
defer cancel()
hb, err := schedule.CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
solve := newBalanceSolver(hb.(*hotScheduler), tc, statistics.Read, transferLeader)
region := core.NewTestRegionInfo(1, 1, []byte(""), []byte(""))

// the hot range is [a,c],[e,f]
b := &metapb.Buckets{
RegionId: 1,
PeriodInMs: 1000,
Keys: [][]byte{[]byte("a"), []byte("b"), []byte("c"), []byte("d"), []byte("e"), []byte("f")},
Stats: &metapb.BucketStats{
ReadBytes: []uint64{10 * units.KiB, 10 * units.KiB, 0, 10 * units.KiB, 10 * units.KiB},
ReadKeys: []uint64{256, 256, 0, 256, 256},
ReadQps: []uint64{0, 0, 0, 0, 0},
WriteBytes: []uint64{0, 0, 0, 0, 0},
WriteQps: []uint64{0, 0, 0, 0, 0},
WriteKeys: []uint64{0, 0, 0, 0, 0},
},
}

task := buckets.NewCheckPeerTask(b)
re.True(tc.HotBucketCache.CheckAsync(task))
time.Sleep(time.Millisecond * 10)
ops := solve.createSplitOperator([]*core.RegionInfo{region})
re.Equal(1, len(ops))
op := ops[0]
re.Equal(splitBucket, op.Desc())
expectKeys := [][]byte{[]byte("a"), []byte("c"), []byte("d"), []byte("f")}
expectOp, err := operator.CreateSplitRegionOperator(splitBucket, region, operator.OpSplit, pdpb.CheckPolicy_USEKEY, expectKeys)
re.NoError(err)
expectOp.GetCreateTime()
re.Equal(expectOp.Brief(), op.Brief())
re.Equal(expectOp.GetAdditionalInfo(), op.GetAdditionalInfo())
}

func checkHotWriteRegionScheduleByteRateOnly(re *require.Assertions, enablePlacementRules bool) {
cancel, opt, tc, oc := prepareSchedulersTest()
defer cancel()
Expand Down
8 changes: 5 additions & 3 deletions pkg/statistics/buckets/bucket_stat_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func init() {

// BucketStatInformer is used to get the bucket statistics.
type BucketStatInformer interface {
BucketsStats(degree int) map[uint64][]*BucketStat
BucketsStats(degree int, regions ...uint64) map[uint64][]*BucketStat
}

// BucketStat is the record the bucket statistics.
Expand Down Expand Up @@ -190,11 +190,13 @@ func (b *BucketTreeItem) calculateHotDegree() {
// TODO: qps should be considered, tikv will report this in next sprint
// the order: read [bytes keys qps] and write[bytes keys qps]
readLoads := stat.Loads[:2]
readHot := slice.AllOf(readLoads, func(i int) bool {
// keep same with the hot region hot degree
// https://github.com/tikv/pd/blob/6f6f545a6716840f7e2c7f4d8ed9b49f613a5cd8/pkg/statistics/hot_peer_cache.go#L220-L222
readHot := slice.AnyOf(readLoads, func(i int) bool {
return readLoads[i] > minHotThresholds[i]
})
writeLoads := stat.Loads[3:5]
writeHot := slice.AllOf(writeLoads, func(i int) bool {
writeHot := slice.AnyOf(writeLoads, func(i int) bool {
return writeLoads[i] > minHotThresholds[3+i]
})
hot := readHot || writeHot
Expand Down
Loading

0 comments on commit cdf0106

Please sign in to comment.