diff --git a/server/schedule/operator/operator.go b/server/schedule/operator/operator.go index d38a4bdcb01..fb7495896d2 100644 --- a/server/schedule/operator/operator.go +++ b/server/schedule/operator/operator.go @@ -149,6 +149,18 @@ func (o *Operator) Status() OpStatus { return o.status.Status() } +// CheckAndGetStatus returns operator status after `CheckExpired` and `CheckTimeout`. +func (o *Operator) CheckAndGetStatus() OpStatus { + switch { + case o.CheckExpired(): + return EXPIRED + case o.CheckTimeout(): + return TIMEOUT + default: + return o.Status() + } +} + // GetReachTimeOf returns the time when operator reaches the given status. func (o *Operator) GetReachTimeOf(st OpStatus) time.Time { return o.status.ReachTimeOf(st) diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index a413b5a675a..24811667c88 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -266,22 +266,32 @@ func getHotRegionThreshold(stats *statistics.StoresStats, typ rwType) [2]uint64 } } +// summaryPendingInfluence calculate the summary of pending Influence for each store +// and clean the region from regionInfluence if they have ended operator. +// It makes each dim rate or count become `weight` times to the origin value. func (h *hotScheduler) summaryPendingInfluence() { + maxZombieDur := h.conf.GetMaxZombieDuration() for ty := resourceType(0); ty < resourceTypeLen; ty++ { - h.pendingSums[ty] = summaryPendingInfluence(h.pendings[ty], h.calcPendingWeight) - } - h.gcRegionPendings() -} - -func (h *hotScheduler) gcRegionPendings() { - for regionID, op := range h.regionPendings { - if op != nil && op.IsEnd() { - if time.Now().After(op.GetCreateTime().Add(h.conf.GetMaxZombieDuration())) { - log.Debug("gc pending influence in hot region scheduler", zap.Uint64("region-id", regionID), zap.Time("create", op.GetCreateTime()), zap.Time("now", time.Now()), zap.Duration("zombie", h.conf.GetMaxZombieDuration())) + ret := make(map[uint64]Influence) + pendings := h.pendings[ty] + for p := range pendings { + weight, needGC := h.calcPendingInfluence(p.op, maxZombieDur) + if needGC { + id := p.op.RegionID() + delete(h.regionPendings, id) + delete(pendings, p) schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Dec() - delete(h.regionPendings, regionID) + log.Debug("gc pending influence in hot region scheduler", + zap.Uint64("region-id", id), + zap.Time("create", p.op.GetCreateTime()), + zap.Time("now", time.Now()), + zap.Duration("zombie", maxZombieDur)) + continue } + ret[p.to] = ret[p.to].add(&p.origin, weight) + ret[p.from] = ret[p.from].add(&p.origin, -weight) } + h.pendingSums[ty] = ret } } @@ -1104,26 +1114,28 @@ func (h *hotScheduler) copyPendingInfluence(ty resourceType) map[uint64]Influenc return ret } -func (h *hotScheduler) calcPendingWeight(op *operator.Operator) float64 { - if op.CheckExpired() || op.CheckTimeout() { - return 0 - } - status := op.Status() +// calcPendingInfluence return the calculate weight of one Operator, the value will between [0,1] +func (h *hotScheduler) calcPendingInfluence(op *operator.Operator, maxZombieDur time.Duration) (weight float64, needGC bool) { + status := op.CheckAndGetStatus() if !operator.IsEndStatus(status) { - return 1 - } - switch status { - case operator.SUCCESS: - zombieDur := time.Since(op.GetReachTimeOf(status)) - maxZombieDur := h.conf.GetMaxZombieDuration() - if zombieDur >= maxZombieDur { - return 0 - } - // TODO: use store statistics update time to make a more accurate estimation - return float64(maxZombieDur-zombieDur) / float64(maxZombieDur) - default: - return 0 + return 1, false + } + + // TODO: use store statistics update time to make a more accurate estimation + zombieDur := time.Since(op.GetReachTimeOf(status)) + if zombieDur >= maxZombieDur { + weight = 0 + } else { + weight = 1 + } + + needGC = weight == 0 + if status != operator.SUCCESS { + // CANCELED, REPLACED, TIMEOUT, EXPIRED, etc. + // The actual weight is 0, but there is still a delay in GC. + weight = 0 } + return } func (h *hotScheduler) clearPendingInfluence() { diff --git a/server/schedulers/hot_test.go b/server/schedulers/hot_test.go index 1c04becb080..4bb328b5161 100644 --- a/server/schedulers/hot_test.go +++ b/server/schedulers/hot_test.go @@ -59,6 +59,9 @@ func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) { } c.Assert(err, IsNil) c.Assert(op, NotNil) + op.Start() + operator.SetOperatorStatusReachTime(op, operator.CREATED, time.Now().Add(-5*statistics.StoreHeartBeatReportInterval*time.Second)) + operator.SetOperatorStatusReachTime(op, operator.STARTED, time.Now().Add((-5*statistics.StoreHeartBeatReportInterval+1)*time.Second)) return op } doneOp := func(region *core.RegionInfo, ty opType) *operator.Operator { @@ -68,26 +71,29 @@ func (s *testHotSchedulerSuite) TestGCPendingOpInfos(c *C) { } shouldRemoveOp := func(region *core.RegionInfo, ty opType) *operator.Operator { op := doneOp(region, ty) - operator.SetOperatorStatusReachTime(op, operator.CREATED, time.Now().Add(-3*statistics.StoreHeartBeatReportInterval*time.Second)) + operator.SetOperatorStatusReachTime(op, operator.CANCELED, time.Now().Add(-3*statistics.StoreHeartBeatReportInterval*time.Second)) return op } opCreaters := [3]func(region *core.RegionInfo, ty opType) *operator.Operator{shouldRemoveOp, notDoneOp, doneOp} typs := []opType{movePeer, transferLeader} - for i := 0; i < len(opCreaters); i++ { + for i, creator := range opCreaters { for j, typ := range typs { - regionID := uint64(i*len(opCreaters) + j + 1) + regionID := uint64(i*len(typs) + j + 1) region := newTestRegion(regionID) - hb.regionPendings[regionID] = opCreaters[i](region, typ) + op := creator(region, typ) + influence := newPendingInfluence(op, 2, 4, Influence{}) + hb.pendings[writePeer][influence] = struct{}{} + hb.regionPendings[regionID] = op } } - hb.gcRegionPendings() + hb.summaryPendingInfluence() // Calling this function will GC. - for i := 0; i < len(opCreaters); i++ { + for i := range opCreaters { for j, typ := range typs { - regionID := uint64(i*len(opCreaters) + j + 1) + regionID := uint64(i*len(typs) + j + 1) if i < 1 { // shouldRemoveOp c.Assert(hb.regionPendings, Not(HasKey), regionID) } else { // notDoneOp, doneOp diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index 166ed211eb1..9fb2c5cfc0d 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -190,19 +190,6 @@ func newPendingInfluence(op *operator.Operator, from, to uint64, infl Influence) } } -func summaryPendingInfluence(pendings map[*pendingInfluence]struct{}, f func(*operator.Operator) float64) map[uint64]Influence { - ret := map[uint64]Influence{} - for p := range pendings { - w := f(p.op) - if w == 0 { - delete(pendings, p) - } - ret[p.to] = ret[p.to].add(&p.origin, w) - ret[p.from] = ret[p.from].add(&p.origin, -w) - } - return ret -} - type storeLoad struct { ByteRate float64 KeyRate float64