Skip to content

Commit

Permalink
filter: refactor StoreStateFilter (#2578)
Browse files Browse the repository at this point in the history
Signed-off-by: disksing <i@disksing.com>
  • Loading branch information
disksing authored Jul 1, 2020
1 parent 85c8a6d commit af5511a
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 46 deletions.
10 changes: 8 additions & 2 deletions pkg/mock/mockoption/mockoption.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,13 @@ func (mso *ScheduleOptions) GetKeyType() core.KeyType {
return core.StringToKeyType(mso.KeyType)
}

// CheckLabelProperty mocks method
// CheckLabelProperty mocks method. It checks if there is any label
// has the same key as typ.
func (mso *ScheduleOptions) CheckLabelProperty(typ string, labels []*metapb.StoreLabel) bool {
return true
for _, l := range labels {
if l.Key == typ {
return true
}
}
return false
}
145 changes: 101 additions & 44 deletions server/schedule/filter/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ type StoreStateFilter struct {
TransferLeader bool
// Set true if the schedule involves any move region operation.
MoveRegion bool
// Set true if allows temporary states.
AllowTemporaryStates bool
}

// Scope returns the scheduler or the checker which the filter acts on.
Expand All @@ -370,64 +372,119 @@ func (f StoreStateFilter) Type() string {
return "store-state-filter"
}

// Source returns true when the store can be selected as the schedule
// source.
func (f StoreStateFilter) Source(opt opt.Options, store *core.StoreInfo) bool {
if store.IsTombstone() ||
store.DownTime() > opt.GetMaxStoreDownTime() {
return false
}
if f.TransferLeader && (store.IsDisconnected() || store.IsBlocked()) {
return false
}
// conditionFunc defines condition to determine a store should be selected.
// It should consider if the filter allows temporary states.
type conditionFunc func(opt.Options, *core.StoreInfo) bool

if f.MoveRegion && !f.filterMoveRegion(opt, true, store) {
return false
}
return true
func (f StoreStateFilter) isTombstone(opt opt.Options, store *core.StoreInfo) bool {
return store.IsTombstone()
}

// Target returns true when the store can be selected as the schedule
// target.
func (f StoreStateFilter) Target(opts opt.Options, store *core.StoreInfo) bool {
if store.IsTombstone() ||
store.IsOffline() ||
store.DownTime() > opts.GetMaxStoreDownTime() {
return false
}
if f.TransferLeader &&
(store.IsDisconnected() ||
store.IsBlocked() ||
store.IsBusy() ||
opts.CheckLabelProperty(opt.RejectLeader, store.GetLabels())) {
return false
}
func (f StoreStateFilter) isDown(opt opt.Options, store *core.StoreInfo) bool {
return store.DownTime() > opt.GetMaxStoreDownTime()
}

if f.MoveRegion {
// only target consider the pending peers because pending more means the disk is slower.
if opts.GetMaxPendingPeerCount() > 0 && store.GetPendingPeerCount() > int(opts.GetMaxPendingPeerCount()) {
return false
}
func (f StoreStateFilter) isOffline(opt opt.Options, store *core.StoreInfo) bool {
return store.IsOffline()
}

if !f.filterMoveRegion(opts, false, store) {
return false
func (f StoreStateFilter) isBlockLeaderTransfer(opt opt.Options, store *core.StoreInfo) bool {
return store.IsBlocked()
}

func (f StoreStateFilter) isDisconnected(opt opt.Options, store *core.StoreInfo) bool {
return !f.AllowTemporaryStates && store.IsDisconnected()
}

func (f StoreStateFilter) isBusy(opt opt.Options, store *core.StoreInfo) bool {
return !f.AllowTemporaryStates && store.IsBusy()
}

func (f StoreStateFilter) exceedRemoveLimit(opt opt.Options, store *core.StoreInfo) bool {
return !f.AllowTemporaryStates && !store.IsAvailable(storelimit.RemovePeer)
}

func (f StoreStateFilter) exceedAddLimit(opt opt.Options, store *core.StoreInfo) bool {
return !f.AllowTemporaryStates && !store.IsAvailable(storelimit.AddPeer)
}

func (f StoreStateFilter) tooManySnapshots(opt opt.Options, store *core.StoreInfo) bool {
return !f.AllowTemporaryStates && (uint64(store.GetSendingSnapCount()) > opt.GetMaxSnapshotCount() ||
uint64(store.GetReceivingSnapCount()) > opt.GetMaxSnapshotCount() ||
uint64(store.GetApplyingSnapCount()) > opt.GetMaxSnapshotCount())
}

func (f StoreStateFilter) tooManyPendingPeers(opt opt.Options, store *core.StoreInfo) bool {
return !f.AllowTemporaryStates &&
opt.GetMaxPendingPeerCount() > 0 &&
store.GetPendingPeerCount() > int(opt.GetMaxPendingPeerCount())
}

func (f StoreStateFilter) hasRejectLeaderProperty(opts opt.Options, store *core.StoreInfo) bool {
return opts.CheckLabelProperty(opt.RejectLeader, store.GetLabels())
}

// The condition table.
// Y: the condition is temporary (expected to become false soon).
// N: the condition is expected to be true for a long time.
// X means when the condition is true, the store CANNOT be selected.
//
// Condition Down Offline Tomb Block Disconn Busy RmLimit AddLimit Snap Pending Reject
// IsTemporary N N N N Y Y Y Y Y Y N
//
// LeaderSource X X X X
// RegionSource X X X
// LeaderTarget X X X X X X X
// RegionTarget X X X X X X X X

const (
leaderSource = iota
regionSource
leaderTarget
regionTarget
)

func (f StoreStateFilter) anyConditionMatch(typ int, opt opt.Options, store *core.StoreInfo) bool {
var funcs []conditionFunc
switch typ {
case leaderSource:
funcs = []conditionFunc{f.isTombstone, f.isDown, f.isBlockLeaderTransfer, f.isDisconnected}
case regionSource:
funcs = []conditionFunc{f.isBusy, f.exceedRemoveLimit, f.tooManySnapshots}
case leaderTarget:
funcs = []conditionFunc{f.isTombstone, f.isOffline, f.isDown, f.isBlockLeaderTransfer,
f.isDisconnected, f.isBusy, f.hasRejectLeaderProperty}
case regionTarget:
funcs = []conditionFunc{f.isTombstone, f.isOffline, f.isDown, f.isDisconnected, f.isBusy,
f.exceedAddLimit, f.tooManySnapshots, f.tooManyPendingPeers}
}
for _, cf := range funcs {
if cf(opt, store) {
return true
}
}
return true
return false
}

func (f StoreStateFilter) filterMoveRegion(opt opt.Options, isSource bool, store *core.StoreInfo) bool {
if store.IsBusy() {
// Source returns true when the store can be selected as the schedule
// source.
func (f StoreStateFilter) Source(opts opt.Options, store *core.StoreInfo) bool {
if f.TransferLeader && f.anyConditionMatch(leaderSource, opts, store) {
return false
}

if (isSource && !store.IsAvailable(storelimit.RemovePeer)) || (!isSource && !store.IsAvailable(storelimit.AddPeer)) {
if f.MoveRegion && f.anyConditionMatch(regionSource, opts, store) {
return false
}
return true
}

if uint64(store.GetSendingSnapCount()) > opt.GetMaxSnapshotCount() ||
uint64(store.GetReceivingSnapCount()) > opt.GetMaxSnapshotCount() ||
uint64(store.GetApplyingSnapCount()) > opt.GetMaxSnapshotCount() {
// Target returns true when the store can be selected as the schedule
// target.
func (f StoreStateFilter) Target(opts opt.Options, store *core.StoreInfo) bool {
if f.TransferLeader && f.anyConditionMatch(leaderTarget, opts, store) {
return false
}
if f.MoveRegion && f.anyConditionMatch(regionTarget, opts, store) {
return false
}
return true
Expand Down
35 changes: 35 additions & 0 deletions server/schedule/filter/filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ package filter

import (
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/v4/pkg/mock/mockcluster"
"github.com/pingcap/pd/v4/pkg/mock/mockoption"
"github.com/pingcap/pd/v4/server/core"
Expand Down Expand Up @@ -115,6 +117,39 @@ func (s *testFiltersSuite) TestRuleFitFilter(c *C) {
c.Assert(filter.Source(tc, tc.GetStore(4)), IsTrue)
}

func (s *testFiltersSuite) TestStoreStateFilter(c *C) {
filters := []Filter{
StoreStateFilter{TransferLeader: true},
StoreStateFilter{MoveRegion: true},
StoreStateFilter{TransferLeader: true, MoveRegion: true},
StoreStateFilter{MoveRegion: true, AllowTemporaryStates: true},
}
opt := mockoption.NewScheduleOptions()
store := core.NewStoreInfoWithLabel(1, 0, map[string]string{})

check := func(n int, store *core.StoreInfo, source, target bool) {
c.Assert(filters[n].Source(opt, store), Equals, source)
c.Assert(filters[n].Target(opt, store), Equals, target)
}
store = store.Clone(core.SetLastHeartbeatTS(time.Now()))
check(2, store, true, true)

// Disconn
store = store.Clone(core.SetLastHeartbeatTS(time.Now().Add(-5 * time.Minute)))
check(0, store, false, false)
check(1, store, true, false)
check(2, store, false, false)
check(3, store, true, true)

// Busy
store = store.Clone(core.SetLastHeartbeatTS(time.Now())).
Clone(core.SetStoreStats(&pdpb.StoreStats{IsBusy: true}))
check(0, store, true, false)
check(1, store, false, false)
check(2, store, false, false)
check(3, store, true, true)
}

func (s *testFiltersSuite) TestPlacementGuard(c *C) {
opt := mockoption.NewScheduleOptions()
opt.LocationLabels = []string{"zone"}
Expand Down

0 comments on commit af5511a

Please sign in to comment.