Skip to content

Commit

Permalink
refactor snap size
Browse files Browse the repository at this point in the history
Signed-off-by: bufferflies <1045931706@qq.com>
  • Loading branch information
bufferflies committed Jul 27, 2022
1 parent f826930 commit 14cffea
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 43 deletions.
4 changes: 4 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2064,6 +2064,10 @@ func (c *RaftCluster) RemoveStoreLimit(storeID uint64) {
for _, limitType := range storelimit.TypeNameValue {
c.core.ResetStoreLimit(storeID, limitType)
}

for _, snapType := range storelimit.SnapTypeNameValue {
c.core.ResetSnapLimit(storeID, snapType)
}
delete(cfg.StoreLimit, storeID)
c.opt.SetScheduleConfig(cfg)
var err error
Expand Down
7 changes: 7 additions & 0 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,13 @@ func (bc *BasicCluster) ResetStoreLimit(storeID uint64, limitType storelimit.Typ
bc.Stores.ResetStoreLimit(storeID, limitType, ratePerSec...)
}

// ResetSnapLimit resets the snapshot limit for the given store.
func (bc *BasicCluster) ResetSnapLimit(storeID uint64, limitType storelimit.SnapType, cap ...int64) {
bc.Lock()
defer bc.Unlock()
bc.Stores.ResetSnapLimit(storeID, limitType, cap...)
}

// UpdateStoreStatus updates the information of the store.
func (bc *BasicCluster) UpdateStoreStatus(storeID uint64, leaderCount int, regionCount int, pendingPeerCount int, leaderSize int64, regionSize int64) {
bc.Lock()
Expand Down
29 changes: 17 additions & 12 deletions server/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type StoreInfo struct {
leaderWeight float64
regionWeight float64
limiter map[storelimit.Type]*storelimit.StoreLimit
snapLimiter map[storelimit.Type]*storelimit.SlidingWindows
snapLimiter map[storelimit.SnapType]*storelimit.SlidingWindows
minResolvedTS uint64
}

Expand All @@ -70,7 +70,7 @@ func NewStoreInfo(store *metapb.Store, opts ...StoreCreateOption) *StoreInfo {
leaderWeight: 1.0,
regionWeight: 1.0,
limiter: make(map[storelimit.Type]*storelimit.StoreLimit),
snapLimiter: make(map[storelimit.Type]*storelimit.SlidingWindows),
snapLimiter: make(map[storelimit.SnapType]*storelimit.SlidingWindows),
minResolvedTS: 0,
}
for _, opt := range opts {
Expand Down Expand Up @@ -159,14 +159,12 @@ func (s *StoreInfo) IsAvailable(limitType storelimit.Type) bool {
return true
}

// IsAvailableSnap returns ture if the store snapshot available size is
// over than the given token.
func (s *StoreInfo) IsAvailableSnap(snapType storelimit.Type) bool {
// IsAvailableSnap returns ture if the store have available size.
func (s *StoreInfo) IsAvailableSnap(snapType storelimit.SnapType) bool {
s.mu.RLock()
defer s.mu.RUnlock()
notNil := s.snapLimiter != nil && s.snapLimiter[snapType] != nil

if notNil {
if s.snapLimiter != nil && s.snapLimiter[snapType] != nil {
isAvailable := s.snapLimiter[snapType].Available(0)
return isAvailable
}
Expand Down Expand Up @@ -319,8 +317,8 @@ func (s *StoreInfo) GetStoreLimit(limitType storelimit.Type) *storelimit.StoreLi
return s.limiter[limitType]
}

// GetSnapLimit returns the snapshot limit of the given storelimit.Type.
func (s *StoreInfo) GetSnapLimit(snapType storelimit.Type) *storelimit.SlidingWindows {
// GetSnapLimit returns the snapshot limit of the given store.
func (s *StoreInfo) GetSnapLimit(snapType storelimit.SnapType) *storelimit.SlidingWindows {
s.mu.RLock()
defer s.mu.RUnlock()
return s.snapLimiter[snapType]
Expand Down Expand Up @@ -686,14 +684,21 @@ func (s *StoresInfo) SlowStoreRecovered(storeID uint64) {
}

// defaultSnapSize is the default snapshot size of the
const defaultSnapSize = 100 * 10
const defaultSnapSize = int64(100 * 10)

// ResetStoreLimit resets the limit for a specific store.
func (s *StoresInfo) ResetStoreLimit(storeID uint64, limitType storelimit.Type, ratePerSec ...float64) {
if store, ok := s.stores[storeID]; ok {
s.stores[storeID] = store.Clone(
ResetStoreLimit(limitType, ratePerSec...),
ResetSnapLimit(limitType, defaultSnapSize))
ResetStoreLimit(limitType, ratePerSec...))
}
}

// ResetSnapLimit resets the snapshot limit for the given store.
func (s *StoresInfo) ResetSnapLimit(storeID uint64, snapType storelimit.SnapType, cap ...int64) {
if store, ok := s.stores[storeID]; ok {
s.stores[storeID] = store.Clone(
ResetSnapLimit(snapType, cap...))
}
}

Expand Down
12 changes: 6 additions & 6 deletions server/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,18 +236,18 @@ func ResetStoreLimit(limitType storelimit.Type, ratePerSec ...float64) StoreCrea
}
}

func ResetSnapLimit(snapType storelimit.Type, capacity int64) StoreCreateOption {
func ResetSnapLimit(snapType storelimit.SnapType, capacity ...int64) StoreCreateOption {
return func(store *StoreInfo) {
store.mu.Lock()
defer store.mu.Unlock()
if capacity == 0 {
store.snapLimiter[snapType] = nil
return
cap := defaultSnapSize
if len(capacity) > 0 {
cap = capacity[0]
}
if limiter := store.snapLimiter[snapType]; limiter != nil {
limiter.Adjust(capacity)
limiter.Adjust(cap)
} else {
store.snapLimiter[snapType] = storelimit.NewSlidingWindows(capacity)
store.snapLimiter[snapType] = storelimit.NewSlidingWindows(cap)
}
}
}
25 changes: 25 additions & 0 deletions server/core/storelimit/sliding_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,31 @@ import (

const snapSize = 10

type SnapType int

const (
// RecvSnapShot indicates the type of store limit that limits the adding peer rate
RecvSnapShot SnapType = iota
// SendSnapShot indicates the type of store limit that limits the leader peer rate
SendSnapShot
)

// SnapTypeNameValue indicates the name of store limit type and the enum value
var SnapTypeNameValue = map[string]SnapType{
"recv-snapshot": RecvSnapShot,
"send-snapshot": SendSnapShot,
}

// String returns the representation of the Type
func (t SnapType) String() string {
for n, v := range SnapTypeNameValue {
if v == t {
return n
}
}
return ""
}

// SlidingWindows limits the operators of a store
type SlidingWindows struct {
mu syncutil.Mutex
Expand Down
4 changes: 2 additions & 2 deletions server/schedule/filter/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,8 @@ func (f *StoreStateFilter) isBusy(opt *config.PersistOptions, store *core.StoreI
}

func (f *StoreStateFilter) exceedRecvSnapLimit(_ *config.PersistOptions, store *core.StoreInfo) plan.Status {
if !f.AllowTemporaryStates && !store.IsAvailableSnap(storelimit.AddPeer) {
f.Reason = "exceed-recv-snap-limit"
if !f.AllowTemporaryStates && !store.IsAvailableSnap(storelimit.RecvSnapShot) {
f.Reason = "exceed-recv-snapshot-limit"
return statusStoreSnapRemoveLimit
}
f.Reason = ""
Expand Down
8 changes: 4 additions & 4 deletions server/schedule/operator/influence.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type StoreInfluence struct {
LeaderSize int64
LeaderCount int64
StepCost map[storelimit.Type]int64
SnapCost map[storelimit.Type]int64
SnapCost map[storelimit.SnapType]int64
}

// ResourceProperty returns delta size of leader/region by influence.
Expand All @@ -64,7 +64,7 @@ func (s StoreInfluence) ResourceProperty(kind core.ScheduleKind) int64 {
}

// GetSnapCost returns the given snapshot size.
func (s StoreInfluence) GetSnapCost(snapType storelimit.Type) int64 {
func (s StoreInfluence) GetSnapCost(snapType storelimit.SnapType) int64 {
if s.SnapCost == nil {
return 0
}
Expand All @@ -80,9 +80,9 @@ func (s StoreInfluence) GetStepCost(limitType storelimit.Type) int64 {
}

// AddSnapCost adds the step cost of specific type store limit according to region size.
func (s *StoreInfluence) AddSnapCost(limitType storelimit.Type, cost int64) {
func (s *StoreInfluence) AddSnapCost(limitType storelimit.SnapType, cost int64) {
if s.SnapCost == nil {
s.SnapCost = make(map[storelimit.Type]int64)
s.SnapCost = make(map[storelimit.SnapType]int64)
}
s.SnapCost[limitType] += cost
}
Expand Down
4 changes: 2 additions & 2 deletions server/schedule/operator/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (ap AddPeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
if ap.IsLightWeight {
return
}
to.AddSnapCost(storelimit.AddPeer, regionSize)
to.AddSnapCost(storelimit.RecvSnapShot, regionSize)
to.AdjustStepCost(storelimit.AddPeer, regionSize)
}

Expand Down Expand Up @@ -230,7 +230,7 @@ func (al AddLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo)
if al.IsLightWeight {
return
}
to.AddSnapCost(storelimit.AddPeer, regionSize)
to.AddSnapCost(storelimit.RecvSnapShot, regionSize)
to.AdjustStepCost(storelimit.AddPeer, regionSize)
}

Expand Down
56 changes: 40 additions & 16 deletions server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,10 @@ func (oc *OperatorController) addOperatorLocked(op *operator.Operator) bool {
if stepCost > 0 && storeLimit != nil {
storeLimit.Take(stepCost)
}
storeLimitCostCounter.WithLabelValues(strconv.FormatUint(storeID, 10), n).Add(float64(stepCost) / float64(storelimit.RegionInfluence[v]))
}

for n, v := range storelimit.SnapTypeNameValue {
snapCost := opInfluence.GetStoreInfluence(storeID).GetSnapCost(v)
snapLimit := store.GetSnapLimit(v)
if snapCost > 0 && snapLimit != nil {
Expand All @@ -499,7 +502,6 @@ func (oc *OperatorController) addOperatorLocked(op *operator.Operator) bool {
zap.String("limit-type", n))
snapLimit.Take(snapCost)
}
storeLimitCostCounter.WithLabelValues(strconv.FormatUint(storeID, 10), n).Add(float64(stepCost) / float64(storelimit.RegionInfluence[v]))
}
}
oc.updateCounts(oc.operators)
Expand Down Expand Up @@ -759,7 +761,7 @@ func (oc *OperatorController) pushFastOperator(op *operator.Operator) {
func (oc *OperatorController) Ack(op *operator.Operator) {
opInfluence := NewTotalOpInfluence([]*operator.Operator{op}, oc.cluster)
for storeID := range opInfluence.StoresInfluence {
for _, v := range storelimit.TypeNameValue {
for _, v := range storelimit.SnapTypeNameValue {
snapCost := opInfluence.GetStoreInfluence(storeID).GetSnapCost(v)
log.Info("snapshot size will reset",
zap.Uint64("store-id", storeID),
Expand Down Expand Up @@ -951,39 +953,61 @@ func (oc *OperatorController) exceedStoreLimitLocked(ops ...*operator.Operator)
opInfluence := NewTotalOpInfluence(ops, oc.cluster)
for storeID := range opInfluence.StoresInfluence {
for _, v := range storelimit.TypeNameValue {
snapCost := opInfluence.GetStoreInfluence(storeID).GetSnapCost(v)
stepCost := opInfluence.GetStoreInfluence(storeID).GetStepCost(v)
limit, snapLimit := oc.getOrCreateStoreLimit(storeID, v)

if stepCost > 0 && limit != nil {
if !snapLimit.Available(snapCost) {
return true
}
if stepCost == 0 {
continue
}
limit := oc.getOrCreateStoreLimit(storeID, v)
if limit == nil {
continue
}
if !limit.Available(stepCost) {
return true
}
}

if snapCost > 0 && snapLimit != nil {
if !snapLimit.Available(snapCost) {
return true
}
for _, v := range storelimit.SnapTypeNameValue {
snapCost := opInfluence.GetStoreInfluence(storeID).GetSnapCost(v)
if snapCost == 0 {
continue
}
limit := oc.getOrCreateSnapLimit(storeID, v)
if limit == nil {
continue
}
if !limit.Available(snapCost) {
return true
}
}
}
return false
}

// getOrCreateStoreLimit is used to get or create the limit of a store.
func (oc *OperatorController) getOrCreateStoreLimit(storeID uint64, limitType storelimit.Type) (*storelimit.StoreLimit, *storelimit.SlidingWindows) {
func (oc *OperatorController) getOrCreateStoreLimit(storeID uint64, limitType storelimit.Type) *storelimit.StoreLimit {
ratePerSec := oc.cluster.GetOpts().GetStoreLimitByType(storeID, limitType) / StoreBalanceBaseTime
s := oc.cluster.GetStore(storeID)
if s == nil {
log.Error("invalid store ID", zap.Uint64("store-id", storeID))
return nil, nil
return nil
}
if s.GetStoreLimit(limitType) == nil {
oc.cluster.GetBasicCluster().ResetStoreLimit(storeID, limitType, ratePerSec)
}
if ratePerSec != s.GetStoreLimit(limitType).Rate() {
oc.cluster.GetBasicCluster().ResetStoreLimit(storeID, limitType, ratePerSec)
}
return s.GetStoreLimit(limitType), s.GetSnapLimit(limitType)
return s.GetStoreLimit(limitType)
}

func (oc *OperatorController) getOrCreateSnapLimit(storeID uint64, snapType storelimit.SnapType) *storelimit.SlidingWindows {
s := oc.cluster.GetStore(storeID)
if s == nil {
log.Error("invalid store ID", zap.Uint64("store-id", storeID))
return nil
}
if s.GetSnapLimit(snapType) == nil {
oc.cluster.GetBasicCluster().ResetSnapLimit(storeID, snapType)
}
return s.GetSnapLimit(snapType)
}
3 changes: 2 additions & 1 deletion server/statistics/store_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ func (s *storeStatistics) Observe(store *core.StoreInfo, stats *StoresStats) {
storeStatusGauge.WithLabelValues(storeAddress, id, "store_available").Set(float64(store.GetAvailable()))
storeStatusGauge.WithLabelValues(storeAddress, id, "store_used").Set(float64(store.GetUsedSize()))
storeStatusGauge.WithLabelValues(storeAddress, id, "store_capacity").Set(float64(store.GetCapacity()))
storeStatusGauge.WithLabelValues(storeAddress, id, "store_used_snapshot_size").Set(float64(store.GetSnapLimit(storelimit.AddPeer).GetUsed()))
storeStatusGauge.WithLabelValues(storeAddress, id, "store_recv_used_snapshot_size").Set(float64(store.GetSnapLimit(storelimit.RecvSnapShot).GetUsed()))
storeStatusGauge.WithLabelValues(storeAddress, id, "store_send_used_snapshot_size").Set(float64(store.GetSnapLimit(storelimit.SendSnapShot).GetUsed()))
// Store flows.
storeFlowStats := stats.GetRollingStoreStats(store.GetID())
if storeFlowStats == nil {
Expand Down

0 comments on commit 14cffea

Please sign in to comment.