Skip to content

Commit

Permalink
Merge commit '21fef0309c2383e66b67976d4e00a0447693b37e' into tiflash-…
Browse files Browse the repository at this point in the history
…hot-write-sche

# Conflicts:
#	server/core/store.go
  • Loading branch information
HunDunDM committed Aug 6, 2021
2 parents f945cc2 + 21fef03 commit f8b4e73
Show file tree
Hide file tree
Showing 17 changed files with 436 additions and 39 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ error = '''
store %v is paused for leader transfer
'''

["PD:core:ErrSlowStoreEvicted"]
error = '''
store %v is evited as a slow store
'''

["PD:core:ErrStoreDestroyed"]
error = '''
store %v has been physically destroyed
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/kvproto v0.0.0-20210604082642-dda0a102bc6a
github.com/pingcap/kvproto v0.0.0-20210805052247-76981389e818
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7
github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5
github.com/pingcap/tidb-dashboard v0.0.0-20210716172320-2226872e3296
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,8 @@ github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMt
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210604082642-dda0a102bc6a h1:1vLA91bNdTnIDd3dwZ5VhwjVB5TLHzS23a47sNoYUlw=
github.com/pingcap/kvproto v0.0.0-20210604082642-dda0a102bc6a/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210805052247-76981389e818 h1:7UlVx1vHJY3uFVa8LoQBEL+HPmu7pg3BnzZBWftX9m8=
github.com/pingcap/kvproto v0.0.0-20210805052247-76981389e818/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ var (
ErrStoreTombstone = errors.Normalize("store %v has been removed", errors.RFCCodeText("PD:core:ErrStoreTombstone"))
ErrStoreDestroyed = errors.Normalize("store %v has been physically destroyed", errors.RFCCodeText("PD:core:ErrStoreDestroyed"))
ErrStoreUnhealthy = errors.Normalize("store %v is unhealthy", errors.RFCCodeText("PD:core:ErrStoreUnhealthy"))
ErrSlowStoreEvicted = errors.Normalize("store %v is evited as a slow store", errors.RFCCodeText("PD:core:ErrSlowStoreEvicted"))
)

// client errors
Expand Down
5 changes: 5 additions & 0 deletions server/api/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ func (h *schedulerHandler) Post(w http.ResponseWriter, r *http.Request) {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}
case schedulers.EvictSlowStoreName:
if err := h.AddEvictSlowStoreScheduler(); err != nil {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}
default:
h.r.JSON(w, http.StatusBadRequest, "unknown scheduler")
return
Expand Down
2 changes: 2 additions & 0 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type StoreStatus struct {
RegionWeight float64 `json:"region_weight"`
RegionScore float64 `json:"region_score"`
RegionSize int64 `json:"region_size"`
SlowScore uint64 `json:"slow_score"`
SendingSnapCount uint32 `json:"sending_snap_count,omitempty"`
ReceivingSnapCount uint32 `json:"receiving_snap_count,omitempty"`
IsBusy bool `json:"is_busy,omitempty"`
Expand Down Expand Up @@ -91,6 +92,7 @@ func newStoreInfo(opt *config.ScheduleConfig, store *core.StoreInfo) *StoreInfo
RegionWeight: store.GetRegionWeight(),
RegionScore: store.RegionScore(opt.RegionScoreFormulaVersion, opt.HighSpaceRatio, opt.LowSpaceRatio, 0),
RegionSize: store.GetRegionSize(),
SlowScore: store.GetSlowScore(),
SendingSnapCount: store.GetSendingSnapCount(),
ReceivingSnapCount: store.GetReceivingSnapCount(),
IsBusy: store.IsBusy(),
Expand Down
11 changes: 11 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,17 @@ func (c *RaftCluster) ResumeLeaderTransfer(storeID uint64) {
c.core.ResumeLeaderTransfer(storeID)
}

// SlowStoreEvicted marks a store as a slow store and prevents transferring
// leader to the store
func (c *RaftCluster) SlowStoreEvicted(storeID uint64) error {
return c.core.SlowStoreEvicted(storeID)
}

// SlowStoreRecovered cleans the evicted state of a store.
func (c *RaftCluster) SlowStoreRecovered(storeID uint64) {
c.core.SlowStoreRecovered(storeID)
}

// AttachAvailableFunc attaches an available function to a specific store.
func (c *RaftCluster) AttachAvailableFunc(storeID uint64, limitType storelimit.Type, f func() bool) {
c.core.AttachAvailableFunc(storeID, limitType, f)
Expand Down
18 changes: 18 additions & 0 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,21 @@ func (bc *BasicCluster) ResumeLeaderTransfer(storeID uint64) {
bc.Stores.ResumeLeaderTransfer(storeID)
}

// SlowStoreEvicted marks a store as a slow store and prevents transferring
// leader to the store
func (bc *BasicCluster) SlowStoreEvicted(storeID uint64) error {
bc.Lock()
defer bc.Unlock()
return bc.Stores.SlowStoreEvicted(storeID)
}

// SlowStoreRecovered cleans the evicted state of a store.
func (bc *BasicCluster) SlowStoreRecovered(storeID uint64) {
bc.Lock()
defer bc.Unlock()
bc.Stores.SlowStoreRecovered(storeID)
}

// AttachAvailableFunc attaches an available function to a specific store.
func (bc *BasicCluster) AttachAvailableFunc(storeID uint64, limitType storelimit.Type, f func() bool) {
bc.Lock()
Expand Down Expand Up @@ -429,6 +444,9 @@ type StoreSetController interface {
PauseLeaderTransfer(id uint64) error
ResumeLeaderTransfer(id uint64)

SlowStoreEvicted(id uint64) error
SlowStoreRecovered(id uint64)

AttachAvailableFunc(id uint64, limitType storelimit.Type, f func() bool)
}

Expand Down
48 changes: 48 additions & 0 deletions server/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
gb = 1 << 30 // 1GB size
initialMaxRegionCounts = 30 // exclude storage Threshold Filter when region less than 30
initialMinSpace = 1 << 33 // 2^33=8GB
slowStoreThreshold = 80

// EngineKey is the label key used to indicate engine.
EngineKey = "engine"
Expand All @@ -47,6 +48,7 @@ type StoreInfo struct {
meta *metapb.Store
*storeStats
pauseLeaderTransfer bool // not allow to be used as source or target of transfer leader
slowStoreEvicted bool // this store has been evicted as a slow store, should not transfer leader to it
leaderCount int
regionCount int
leaderSize int64
Expand Down Expand Up @@ -79,6 +81,7 @@ func (s *StoreInfo) Clone(opts ...StoreCreateOption) *StoreInfo {
meta: meta,
storeStats: s.storeStats,
pauseLeaderTransfer: s.pauseLeaderTransfer,
slowStoreEvicted: s.slowStoreEvicted,
leaderCount: s.leaderCount,
regionCount: s.regionCount,
leaderSize: s.leaderSize,
Expand All @@ -102,6 +105,7 @@ func (s *StoreInfo) ShallowClone(opts ...StoreCreateOption) *StoreInfo {
meta: s.meta,
storeStats: s.storeStats,
pauseLeaderTransfer: s.pauseLeaderTransfer,
slowStoreEvicted: s.slowStoreEvicted,
leaderCount: s.leaderCount,
regionCount: s.regionCount,
leaderSize: s.leaderSize,
Expand All @@ -125,6 +129,11 @@ func (s *StoreInfo) AllowLeaderTransfer() bool {
return !s.pauseLeaderTransfer
}

// EvictedAsSlowStore returns if the store should be evicted as a slow store.
func (s *StoreInfo) EvictedAsSlowStore() bool {
return s.slowStoreEvicted
}

// IsAvailable returns if the store bucket of limitation is available
func (s *StoreInfo) IsAvailable(limitType storelimit.Type) bool {
if s.available != nil && s.available[limitType] != nil {
Expand All @@ -148,6 +157,20 @@ func (s *StoreInfo) IsTombstone() bool {
return s.GetState() == metapb.StoreState_Tombstone
}

// GetSlowScore returns the slow score of the store.
func (s *StoreInfo) GetSlowScore() uint64 {
s.mu.RLock()
defer s.mu.RUnlock()
return s.rawStats.GetSlowScore()
}

// IsSlow checks if the slow score reaches the threshold.
func (s *StoreInfo) IsSlow() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.rawStats.GetSlowScore() >= slowStoreThreshold
}

// IsPhysicallyDestroyed checks if the store's physically destroyed.
func (s *StoreInfo) IsPhysicallyDestroyed() bool {
return s.GetMeta().GetPhysicallyDestroyed()
Expand Down Expand Up @@ -562,6 +585,31 @@ func (s *StoresInfo) ResumeLeaderTransfer(storeID uint64) {
s.stores[storeID] = store.Clone(ResumeLeaderTransfer())
}

// SlowStoreEvicted marks a store as a slow store and prevents transferring
// leader to the store
func (s *StoresInfo) SlowStoreEvicted(storeID uint64) error {
store, ok := s.stores[storeID]
if !ok {
return errs.ErrStoreNotFound.FastGenByArgs(storeID)
}
if store.EvictedAsSlowStore() {
return errs.ErrSlowStoreEvicted.FastGenByArgs(storeID)
}
s.stores[storeID] = store.Clone(SlowStoreEvicted())
return nil
}

// SlowStoreRecovered cleans the evicted state of a store.
func (s *StoresInfo) SlowStoreRecovered(storeID uint64) {
store, ok := s.stores[storeID]
if !ok {
log.Warn("try to clean a store's evicted as a slow store state, but it is not found. It may be cleanup",
zap.Uint64("store-id", storeID))
return
}
s.stores[storeID] = store.Clone(SlowStoreRecovered())
}

// AttachAvailableFunc attaches f to a specific store.
func (s *StoresInfo) AttachAvailableFunc(storeID uint64, limitType storelimit.Type, f func() bool) {
if store, ok := s.stores[storeID]; ok {
Expand Down
15 changes: 15 additions & 0 deletions server/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,21 @@ func ResumeLeaderTransfer() StoreCreateOption {
}
}

// SlowStoreEvicted marks a store as a slow store and prevents transferring
// leader to the store
func SlowStoreEvicted() StoreCreateOption {
return func(store *StoreInfo) {
store.slowStoreEvicted = true
}
}

// SlowStoreRecovered cleans the evicted state of a store.
func SlowStoreRecovered() StoreCreateOption {
return func(store *StoreInfo) {
store.slowStoreEvicted = false
}
}

// SetLeaderCount sets the leader count for the store.
func SetLeaderCount(leaderCount int) StoreCreateOption {
return func(store *StoreInfo) {
Expand Down
5 changes: 5 additions & 0 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,11 @@ func (h *Handler) AddShuffleHotRegionScheduler(limit uint64) error {
return h.AddScheduler(schedulers.ShuffleHotRegionType, strconv.FormatUint(limit, 10))
}

// AddEvictSlowStoreScheduler adds a evict-slow-store-scheduler.
func (h *Handler) AddEvictSlowStoreScheduler() error {
return h.AddScheduler(schedulers.EvictSlowStoreType)
}

// AddRandomMergeScheduler adds a random-merge-scheduler.
func (h *Handler) AddRandomMergeScheduler() error {
return h.AddScheduler(schedulers.RandomMergeType)
Expand Down
7 changes: 6 additions & 1 deletion server/schedule/filter/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ func (f *StoreStateFilter) pauseLeaderTransfer(opt *config.PersistOptions, store
return !store.AllowLeaderTransfer()
}

func (f *StoreStateFilter) slowStoreEvicted(opt *config.PersistOptions, store *core.StoreInfo) bool {
f.Reason = "slow-store"
return store.EvictedAsSlowStore()
}

func (f *StoreStateFilter) isDisconnected(opt *config.PersistOptions, store *core.StoreInfo) bool {
f.Reason = "disconnected"
return !f.AllowTemporaryStates && store.IsDisconnected()
Expand Down Expand Up @@ -383,7 +388,7 @@ func (f *StoreStateFilter) anyConditionMatch(typ int, opt *config.PersistOptions
funcs = []conditionFunc{f.isBusy, f.exceedRemoveLimit, f.tooManySnapshots}
case leaderTarget:
funcs = []conditionFunc{f.isTombstone, f.isOffline, f.isDown, f.pauseLeaderTransfer,
f.isDisconnected, f.isBusy, f.hasRejectLeaderProperty}
f.slowStoreEvicted, f.isDisconnected, f.isBusy, f.hasRejectLeaderProperty}
case regionTarget:
funcs = []conditionFunc{f.isTombstone, f.isOffline, f.isDown, f.isDisconnected, f.isBusy,
f.exceedAddLimit, f.tooManySnapshots, f.tooManyPendingPeers}
Expand Down
73 changes: 38 additions & 35 deletions server/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,35 +235,15 @@ func (s *evictLeaderScheduler) IsScheduleAllowed(cluster opt.Cluster) bool {
return allowed
}

func (s *evictLeaderScheduler) scheduleOnce(cluster opt.Cluster) []*operator.Operator {
ops := make([]*operator.Operator, 0, len(s.conf.StoreIDWithRanges))
for id, ranges := range s.conf.StoreIDWithRanges {
region := cluster.RandLeaderRegion(id, ranges, opt.HealthRegion(cluster))
if region == nil {
schedulerCounter.WithLabelValues(s.GetName(), "no-leader").Inc()
continue
}
func (s *evictLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Operator {
schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc()
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()

target := filter.NewCandidates(cluster.GetFollowerStores(region)).
FilterTarget(cluster.GetOpts(), &filter.StoreStateFilter{ActionScope: EvictLeaderName, TransferLeader: true}).
RandomPick()
if target == nil {
schedulerCounter.WithLabelValues(s.GetName(), "no-target-store").Inc()
continue
}
op, err := operator.CreateTransferLeaderOperator(EvictLeaderType, cluster, region, region.GetLeader().GetStoreId(), target.GetID(), operator.OpLeader)
if err != nil {
log.Debug("fail to create evict leader operator", errs.ZapError(err))
continue
}
op.SetPriorityLevel(core.HighPriority)
op.Counters = append(op.Counters, schedulerCounter.WithLabelValues(s.GetName(), "new-operator"))
ops = append(ops, op)
}
return ops
return scheduleEvictLeaderBatch(s.GetName(), cluster, s.conf.StoreIDWithRanges, EvictLeaderBatchSize)
}

func (s *evictLeaderScheduler) uniqueAppend(dst []*operator.Operator, src ...*operator.Operator) []*operator.Operator {
func uniqueAppendOperator(dst []*operator.Operator, src ...*operator.Operator) []*operator.Operator {
regionIDs := make(map[uint64]struct{})
for i := range dst {
regionIDs[dst[i].RegionID()] = struct{}{}
Expand All @@ -278,25 +258,48 @@ func (s *evictLeaderScheduler) uniqueAppend(dst []*operator.Operator, src ...*op
return dst
}

func (s *evictLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Operator {
schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc()
func scheduleEvictLeaderBatch(name string, cluster opt.Cluster, storeRanges map[uint64][]core.KeyRange, batchSize int) []*operator.Operator {
var ops []*operator.Operator
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()

for i := 0; i < EvictLeaderBatchSize; i++ {
once := s.scheduleOnce(cluster)
for i := 0; i < batchSize; i++ {
once := scheduleEvictLeaderOnce(name, cluster, storeRanges)
// no more regions
if len(once) == 0 {
break
}
ops = s.uniqueAppend(ops, once...)
ops = uniqueAppendOperator(ops, once...)
// the batch has been fulfilled
if len(ops) > EvictLeaderBatchSize {
if len(ops) > batchSize {
break
}
}
return ops
}

func scheduleEvictLeaderOnce(name string, cluster opt.Cluster, storeRanges map[uint64][]core.KeyRange) []*operator.Operator {
ops := make([]*operator.Operator, 0, len(storeRanges))
for id, ranges := range storeRanges {
region := cluster.RandLeaderRegion(id, ranges, opt.HealthRegion(cluster))
if region == nil {
schedulerCounter.WithLabelValues(name, "no-leader").Inc()
continue
}

target := filter.NewCandidates(cluster.GetFollowerStores(region)).
FilterTarget(cluster.GetOpts(), &filter.StoreStateFilter{ActionScope: EvictLeaderName, TransferLeader: true}).
RandomPick()
if target == nil {
schedulerCounter.WithLabelValues(name, "no-target-store").Inc()
continue
}
op, err := operator.CreateTransferLeaderOperator(EvictLeaderType, cluster, region, region.GetLeader().GetStoreId(), target.GetID(), operator.OpLeader)
if err != nil {
log.Debug("fail to create evict leader operator", errs.ZapError(err))
continue
}
op.SetPriorityLevel(core.HighPriority)
op.Counters = append(op.Counters, schedulerCounter.WithLabelValues(name, "new-operator"))
ops = append(ops, op)
}
return ops
}

Expand Down
Loading

0 comments on commit f8b4e73

Please sign in to comment.