Skip to content

Commit

Permalink
schedulers: unify the use and GC of hot-region's pendings and regionP…
Browse files Browse the repository at this point in the history
…endings (#3921)

Signed-off-by: HunDunDM <hundundm@gmail.com>
  • Loading branch information
HunDunDM committed Sep 8, 2021
1 parent fdadb32 commit 9a0d2b3
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 49 deletions.
12 changes: 12 additions & 0 deletions server/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,18 @@ func (o *Operator) Status() OpStatus {
return o.status.Status()
}

// CheckAndGetStatus returns operator status after `CheckExpired` and `CheckTimeout`.
func (o *Operator) CheckAndGetStatus() OpStatus {
switch {
case o.CheckExpired():
return EXPIRED
case o.CheckTimeout():
return TIMEOUT
default:
return o.Status()
}
}

// GetReachTimeOf returns the time when operator reaches the given status.
func (o *Operator) GetReachTimeOf(st OpStatus) time.Time {
return o.status.ReachTimeOf(st)
Expand Down
70 changes: 41 additions & 29 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,22 +266,32 @@ func getHotRegionThreshold(stats *statistics.StoresStats, typ rwType) [2]uint64
}
}

// summaryPendingInfluence calculate the summary of pending Influence for each store
// and clean the region from regionInfluence if they have ended operator.
// It makes each dim rate or count become `weight` times to the origin value.
func (h *hotScheduler) summaryPendingInfluence() {
maxZombieDur := h.conf.GetMaxZombieDuration()
for ty := resourceType(0); ty < resourceTypeLen; ty++ {
h.pendingSums[ty] = summaryPendingInfluence(h.pendings[ty], h.calcPendingWeight)
}
h.gcRegionPendings()
}

func (h *hotScheduler) gcRegionPendings() {
for regionID, op := range h.regionPendings {
if op != nil && op.IsEnd() {
if time.Now().After(op.GetCreateTime().Add(h.conf.GetMaxZombieDuration())) {
log.Debug("gc pending influence in hot region scheduler", zap.Uint64("region-id", regionID), zap.Time("create", op.GetCreateTime()), zap.Time("now", time.Now()), zap.Duration("zombie", h.conf.GetMaxZombieDuration()))
ret := make(map[uint64]Influence)
pendings := h.pendings[ty]
for p := range pendings {
weight, needGC := h.calcPendingInfluence(p.op, maxZombieDur)
if needGC {
id := p.op.RegionID()
delete(h.regionPendings, id)
delete(pendings, p)
schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Dec()
delete(h.regionPendings, regionID)
log.Debug("gc pending influence in hot region scheduler",
zap.Uint64("region-id", id),
zap.Time("create", p.op.GetCreateTime()),
zap.Time("now", time.Now()),
zap.Duration("zombie", maxZombieDur))
continue
}
ret[p.to] = ret[p.to].add(&p.origin, weight)
ret[p.from] = ret[p.from].add(&p.origin, -weight)
}
h.pendingSums[ty] = ret
}
}

Expand Down Expand Up @@ -1104,26 +1114,28 @@ func (h *hotScheduler) copyPendingInfluence(ty resourceType) map[uint64]Influenc
return ret
}

func (h *hotScheduler) calcPendingWeight(op *operator.Operator) float64 {
if op.CheckExpired() || op.CheckTimeout() {
return 0
}
status := op.Status()
// calcPendingInfluence return the calculate weight of one Operator, the value will between [0,1]
func (h *hotScheduler) calcPendingInfluence(op *operator.Operator, maxZombieDur time.Duration) (weight float64, needGC bool) {
status := op.CheckAndGetStatus()
if !operator.IsEndStatus(status) {
return 1
}
switch status {
case operator.SUCCESS:
zombieDur := time.Since(op.GetReachTimeOf(status))
maxZombieDur := h.conf.GetMaxZombieDuration()
if zombieDur >= maxZombieDur {
return 0
}
// TODO: use store statistics update time to make a more accurate estimation
return float64(maxZombieDur-zombieDur) / float64(maxZombieDur)
default:
return 0
return 1, false
}

// TODO: use store statistics update time to make a more accurate estimation
zombieDur := time.Since(op.GetReachTimeOf(status))
if zombieDur >= maxZombieDur {
weight = 0
} else {
weight = 1
}

needGC = weight == 0
if status != operator.SUCCESS {
// CANCELED, REPLACED, TIMEOUT, EXPIRED, etc.
// The actual weight is 0, but there is still a delay in GC.
weight = 0
}
return
}

func (h *hotScheduler) clearPendingInfluence() {
Expand Down
20 changes: 13 additions & 7 deletions server/schedulers/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) {
}
c.Assert(err, IsNil)
c.Assert(op, NotNil)
op.Start()
operator.SetOperatorStatusReachTime(op, operator.CREATED, time.Now().Add(-5*statistics.StoreHeartBeatReportInterval*time.Second))
operator.SetOperatorStatusReachTime(op, operator.STARTED, time.Now().Add((-5*statistics.StoreHeartBeatReportInterval+1)*time.Second))
return op
}
doneOp := func(region *core.RegionInfo, ty opType) *operator.Operator {
Expand All @@ -68,26 +71,29 @@ func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) {
}
shouldRemoveOp := func(region *core.RegionInfo, ty opType) *operator.Operator {
op := doneOp(region, ty)
operator.SetOperatorStatusReachTime(op, operator.CREATED, time.Now().Add(-3*statistics.StoreHeartBeatReportInterval*time.Second))
operator.SetOperatorStatusReachTime(op, operator.CANCELED, time.Now().Add(-3*statistics.StoreHeartBeatReportInterval*time.Second))
return op
}
opCreaters := [3]func(region *core.RegionInfo, ty opType) *operator.Operator{shouldRemoveOp, notDoneOp, doneOp}

typs := []opType{movePeer, transferLeader}

for i := 0; i < len(opCreaters); i++ {
for i, creator := range opCreaters {
for j, typ := range typs {
regionID := uint64(i*len(opCreaters) + j + 1)
regionID := uint64(i*len(typs) + j + 1)
region := newTestRegion(regionID)
hb.regionPendings[regionID] = opCreaters[i](region, typ)
op := creator(region, typ)
influence := newPendingInfluence(op, 2, 4, Influence{})
hb.pendings[writePeer][influence] = struct{}{}
hb.regionPendings[regionID] = op
}
}

hb.gcRegionPendings()
hb.summaryPendingInfluence() // Calling this function will GC.

for i := 0; i < len(opCreaters); i++ {
for i := range opCreaters {
for j, typ := range typs {
regionID := uint64(i*len(opCreaters) + j + 1)
regionID := uint64(i*len(typs) + j + 1)
if i < 1 { // shouldRemoveOp
c.Assert(hb.regionPendings, Not(HasKey), regionID)
} else { // notDoneOp, doneOp
Expand Down
13 changes: 0 additions & 13 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,19 +190,6 @@ func newPendingInfluence(op *operator.Operator, from, to uint64, infl Influence)
}
}

func summaryPendingInfluence(pendings map[*pendingInfluence]struct{}, f func(*operator.Operator) float64) map[uint64]Influence {
ret := map[uint64]Influence{}
for p := range pendings {
w := f(p.op)
if w == 0 {
delete(pendings, p)
}
ret[p.to] = ret[p.to].add(&p.origin, w)
ret[p.from] = ret[p.from].add(&p.origin, -w)
}
return ret
}

type storeLoad struct {
ByteRate float64
KeyRate float64
Expand Down

0 comments on commit 9a0d2b3

Please sign in to comment.