Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions pkg/schedule/operator/status_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,12 @@ func (o *Operator) SetAdditionalInfo(key string, value string) {
o.additionalInfos.value[key] = value
}

// GetAdditionalInfo returns additional info with key.
func (o *Operator) GetAdditionalInfo(key string) string {
// GetAdditionalInfo returns additional info value with key.
func (o *Operator) GetAdditionalInfo(key string) (string, bool) {
o.additionalInfos.RLock()
defer o.additionalInfos.RUnlock()
return o.additionalInfos.value[key]
val, exist := o.additionalInfos.value[key]
return val, exist
}

// LogAdditionalInfo returns additional info with string
Expand All @@ -178,18 +179,23 @@ func (o *Operator) LogAdditionalInfo() string {
// HasRelatedMergeRegion checks if the operator has a related merge region.
// All merge operators (OpMerge and OpAffinity) have this info set.
func (o *Operator) HasRelatedMergeRegion() bool {
val, exist := o.GetAdditionalInfo(string(RelatedMergeRegion))
if o == nil {
return false
}
return o.GetAdditionalInfo(string(RelatedMergeRegion)) != ""
return exist && val != ""
}

// GetRelatedMergeRegion returns the related merge region ID.
func (o *Operator) GetRelatedMergeRegion() uint64 {
if !o.HasRelatedMergeRegion() {
return 0
}
str := o.GetAdditionalInfo(string(RelatedMergeRegion))
str, exist := o.GetAdditionalInfo(string(RelatedMergeRegion))
if !exist {
log.Debug("not found related merge region ID")
return 0
}
relatedID, err := strconv.ParseUint(str, 10, 64)
if err != nil {
log.Warn("invalid related merge region ID",
Expand Down
3 changes: 2 additions & 1 deletion pkg/schedule/operator/status_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ func TestAdditionalInfoConcurrent(t *testing.T) {
key := fmt.Sprintf("key%d", i)
value := fmt.Sprintf("value%d", i)
op.SetAdditionalInfo(key, value)
if op.GetAdditionalInfo(key) != value {
val, ok := op.GetAdditionalInfo(key)
if ok && val != value {
t.Errorf("unexpected value for key %s", key)
}
}(i)
Expand Down
41 changes: 36 additions & 5 deletions pkg/schedule/scatter/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"math"
"strconv"
"strings"
"sync"
"time"

Expand All @@ -42,8 +43,10 @@ import (
const regionScatterName = "region-scatter"

var (
gcInterval = time.Minute
gcTTL = time.Minute * 3
gcInterval = time.Minute
gcTTL = time.Minute * 3
operatorPriorityLevel = constant.High

// WithLabelValues is a heavy operation, define variable to avoid call it every time.
scatterSkipEmptyRegionCounter = scatterCounter.WithLabelValues("skip", "empty-region")
scatterSkipNoRegionCounter = scatterCounter.WithLabelValues("skip", "no-region")
Expand All @@ -54,14 +57,20 @@ var (
scatterUnnecessaryCounter = scatterCounter.WithLabelValues("unnecessary", "")
scatterFailCounter = scatterCounter.WithLabelValues("fail", "")
scatterSuccessCounter = scatterCounter.WithLabelValues("success", "")
<<<<<<< HEAD
errRegionNotFound = errors.New("region not found")
errEmptyRegion = errors.New("empty region")
=======
scatterOperatorRunningCounter = scatterCounter.WithLabelValues("skip", "running")
scatterOperatorExistedCounter = scatterCounter.WithLabelValues("fail", "other-existed")
>>>>>>> 9fe5653625 (scatter:check operator exist before scatter (#10082))
)

const (
maxSleepDuration = time.Minute
initialSleepDuration = 100 * time.Millisecond
maxRetryLimit = 30
scatterOperatorDesc = "scatter-region"
)

type selectedStores struct {
Expand Down Expand Up @@ -156,7 +165,7 @@ type engineContext struct {

func newEngineContext(ctx context.Context, filterFuncs ...filterFunc) engineContext {
filterFuncs = append(filterFuncs, func() filter.Filter {
return &filter.StoreStateFilter{ActionScope: regionScatterName, MoveRegion: true, ScatterRegion: true, OperatorLevel: constant.High}
return &filter.StoreStateFilter{ActionScope: regionScatterName, MoveRegion: true, ScatterRegion: true, OperatorLevel: operatorPriorityLevel}
})
return engineContext{
filterFuncs: filterFuncs,
Expand Down Expand Up @@ -287,6 +296,28 @@ func (r *RegionScatterer) Scatter(region *core.RegionInfo, group string, skipSto
return nil, errors.Errorf("region %d is not fully replicated", region.GetID())
}

// Check if there is any existing operator for the region.
// if the exist operator level is higher than scatter operator level, give up to create new scatter operator new.
// otherwise, create new scatter operator to replace the existing one.
if op := r.opController.GetOperator(region.GetID()); op != nil && op.GetPriorityLevel() >= operatorPriorityLevel {
val, exist := op.GetAdditionalInfo("group")
// If the existing operator is created by the same group scatterer, just skip creating a new one.
if strings.Contains(op.Desc(), scatterOperatorDesc) && exist && val == group {
scatterOperatorRunningCounter.Inc()
log.Debug("scatter operator is already running",
zap.Uint64("region-id", region.GetID()))
return nil, nil
}
scatterOperatorExistedCounter.Inc()
log.Debug("the operator exist, but it does not meet requirement",
zap.Uint64("region-id", region.GetID()),
zap.String("additional-info-group", val),
zap.String("operator-des", op.Desc()),
zap.Bool("group-exist", exist),
)
return nil, errors.Errorf("the operator of region %d already exist", region.GetID())
}

if region.GetLeader() == nil {
scatterSkipNoLeaderCounter.Inc()
log.Warn("region no leader during scatter", zap.Uint64("region-id", region.GetID()))
Expand Down Expand Up @@ -405,7 +436,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s
r.Put(targetPeers, targetLeader, group)
return nil, nil
}
op, err := operator.CreateScatterRegionOperator("scatter-region", r.cluster, region, targetPeers, targetLeader, skipStoreLimit)
op, err := operator.CreateScatterRegionOperator(scatterOperatorDesc, r.cluster, region, targetPeers, targetLeader, skipStoreLimit)
if err != nil {
scatterFailCounter.Inc()
for _, peer := range region.GetPeers() {
Expand All @@ -420,7 +451,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s
r.Put(targetPeers, targetLeader, group)
op.SetAdditionalInfo("group", group)
op.SetAdditionalInfo("leader-picked-count", strconv.FormatUint(leaderStorePickedCount, 10))
op.SetPriorityLevel(constant.High)
op.SetPriorityLevel(operatorPriorityLevel)
}
return op, nil
}
Expand Down
30 changes: 29 additions & 1 deletion pkg/schedule/scatter/region_scatterer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/pkg/mock/mockconfig"
Expand Down Expand Up @@ -681,7 +682,9 @@ func TestSelectedStoresTooFewPeers(t *testing.T) {
re.NoError(err)
re.False(isPeerCountChanged(op))
if op != nil {
re.Equal(group, op.GetAdditionalInfo("group"))
val, exist := op.GetAdditionalInfo("group")
re.True(exist)
re.Equal(group, val)
}
}
}
Expand Down Expand Up @@ -838,6 +841,31 @@ func TestRemoveStoreLimit(t *testing.T) {
re.True(oc.AddOperator(op))
}
}

// same scatter operator should be skipped
region := tc.GetRegion(2)
op, err := scatterer.Scatter(region, "", true)
re.NoError(err)
re.Nil(op)

// different scatter operator should be added
region = tc.GetRegion(3)
op, err = scatterer.Scatter(region, "test", true)
re.Error(err)
re.Nil(op)

// exist lower operator
regionID := uint64(5)
op = oc.GetOperator(regionID)
re.NotNil(op)
re.True(oc.RemoveOperator(op))
region = tc.GetRegion(regionID)
op = operator.NewTestOperator(region.GetID(), region.GetRegionEpoch(), operator.OpRegion)
op.SetPriorityLevel(constant.Low)
re.True(oc.AddOperator(op))
op, err = scatterer.Scatter(region, "", true)
re.NoError(err)
re.NotNil(op)
}

func TestScatterWithAffinity(t *testing.T) {
Expand Down
26 changes: 18 additions & 8 deletions pkg/schedule/schedulers/balance_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ import (
"github.com/tikv/pd/pkg/utils/keyutil"
)

func checkOperator(re *require.Assertions, op *operator.Operator, sourceScore string, targetScore string) {
val, exist := op.GetAdditionalInfo("sourceScore")
re.True(exist)
re.Equal(sourceScore, val)
val, exist = op.GetAdditionalInfo("targetScore")
re.True(exist)
re.Equal(targetScore, val)
}

func TestPlacementRule(t *testing.T) {
re := require.New(t)
cancel, _, tc, oc := prepareSchedulersTest()
Expand Down Expand Up @@ -221,8 +230,7 @@ func TestTIKVEngine(t *testing.T) {
ops, _ = scheduler.Schedule(tc, true)
re.NotEmpty(ops)
op := ops[0]
re.Equal("3.00", op.GetAdditionalInfo("sourceScore"))
re.Equal("0.00", op.GetAdditionalInfo("targetScore"))
checkOperator(re, op, "3.00", "0.00")
re.Contains(op.Brief(), "transfer leader: store 1 to 3")

// case2: move leader from store 1 to store 4
Expand All @@ -232,8 +240,7 @@ func TestTIKVEngine(t *testing.T) {
ops, _ = scheduler.Schedule(tc, true)
re.NotEmpty(ops)
op = ops[0]
re.Equal("3.00", op.GetAdditionalInfo("sourceScore"))
re.Equal("0.00", op.GetAdditionalInfo("targetScore"))
checkOperator(re, op, "3.00", "0.00")
re.Contains(op.Brief(), "mv peer: store [1] to [4]")
re.Equal("transfer leader from store 1 to store 4", op.Step(2).String())
}
Expand Down Expand Up @@ -372,10 +379,13 @@ func TestTIFLASHEngine(t *testing.T) {
ops, _ = scheduler.Schedule(tc, false)
re.NotEmpty(ops)
op := ops[0]
re.Equal("3.00", op.GetAdditionalInfo("sourceScore"))
re.Equal("0.00", op.GetAdditionalInfo("targetScore"))
re.Equal("1.00", op.GetAdditionalInfo("sourceExpectScore"))
re.Equal("1.00", op.GetAdditionalInfo("targetExpectScore"))
checkOperator(re, op, "3.00", "0.00")
sourceExpectScore, exist := op.GetAdditionalInfo("sourceExpectScore")
re.True(exist)
re.Equal("1.00", sourceExpectScore)
targetExpectScore, exist := op.GetAdditionalInfo("targetExpectScore")
re.True(exist)
re.Equal("1.00", targetExpectScore)
re.Contains(op.Brief(), "mv peer: store [4] to")
}

Expand Down
Loading