Skip to content

Commit

Permalink
schedule: check joint state when creating split/merge operator (tikv#…
Browse files Browse the repository at this point in the history
…3148)

* schedule: check joint state when creating split operator

Signed-off-by: Zheng Xiangsheng <hundundm@gmail.com>

* schedule: check joint state when creating merge operator

Signed-off-by: Zheng Xiangsheng <hundundm@gmail.com>

* add log

Signed-off-by: Zheng Xiangsheng <hundundm@gmail.com>

* add split unit test

Signed-off-by: Zheng Xiangsheng <hundundm@gmail.com>

* add merge region unit test

Signed-off-by: Zheng Xiangsheng <hundundm@gmail.com>

Co-authored-by: Ti Prow Robot <71242396+ti-community-prow-bot@users.noreply.github.com>
  • Loading branch information
HunDunDM and ti-chi-bot authored Dec 4, 2020
1 parent 8781595 commit 3f28c56
Show file tree
Hide file tree
Showing 6 changed files with 254 additions and 7 deletions.
6 changes: 5 additions & 1 deletion server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,11 @@ func (h *Handler) AddSplitRegionOperator(regionID uint64, policyStr string, keys
}
}

op := operator.CreateSplitRegionOperator("admin-split-region", region, operator.OpAdmin, pdpb.CheckPolicy(policy), splitKeys)
op, err := operator.CreateSplitRegionOperator("admin-split-region", region, operator.OpAdmin, pdpb.CheckPolicy(policy), splitKeys)
if err != nil {
return err
}

if ok := c.GetOperatorController().AddOperator(op); !ok {
return errors.WithStack(ErrAddOperator)
}
Expand Down
4 changes: 2 additions & 2 deletions server/schedule/checker/joint_state_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ package checker

import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedule/opt"
"go.uber.org/zap"
)

// JointStateChecker ensures region is in joint state will leave.
Expand All @@ -42,7 +42,7 @@ func (c *JointStateChecker) Check(region *core.RegionInfo) *operator.Operator {
op, err := operator.CreateLeaveJointStateOperator("leave-joint-state", c.cluster, region)
if err != nil {
checkerCounter.WithLabelValues("joint_state_checker", "create-operator-fail").Inc()
log.Debug("fail to create leave joint state operator", zap.Error(err))
log.Debug("fail to create leave joint state operator", errs.ZapError(err))
return nil
} else if op != nil {
checkerCounter.WithLabelValues("joint_state_checker", "new-operator").Inc()
Expand Down
9 changes: 8 additions & 1 deletion server/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,14 @@ func (c *RuleChecker) fixRange(region *core.RegionInfo) *operator.Operator {
if len(keys) == 0 {
return nil
}
return operator.CreateSplitRegionOperator("rule-split-region", region, 0, pdpb.CheckPolicy_USEKEY, keys)

op, err := operator.CreateSplitRegionOperator("rule-split-region", region, 0, pdpb.CheckPolicy_USEKEY, keys)
if err != nil {
log.Debug("create split region operator failed", errs.ZapError(err))
return nil
}

return op
}

func (c *RuleChecker) fixRulePeer(region *core.RegionInfo, fit *placement.RegionFit, rf *placement.RuleFit) (*operator.Operator, error) {
Expand Down
12 changes: 10 additions & 2 deletions server/schedule/operator/create_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ func CreateMoveLeaderOperator(desc string, cluster opt.Cluster, region *core.Reg
}

// CreateSplitRegionOperator creates an operator that splits a region.
func CreateSplitRegionOperator(desc string, region *core.RegionInfo, kind OpKind, policy pdpb.CheckPolicy, keys [][]byte) *Operator {
func CreateSplitRegionOperator(desc string, region *core.RegionInfo, kind OpKind, policy pdpb.CheckPolicy, keys [][]byte) (*Operator, error) {
if core.IsInJointState(region.GetPeers()...) {
return nil, errors.Errorf("cannot split region which is in joint state")
}

step := SplitRegion{
StartKey: region.GetStartKey(),
EndKey: region.GetEndKey(),
Expand All @@ -110,11 +114,15 @@ func CreateSplitRegionOperator(desc string, region *core.RegionInfo, kind OpKind
}
brief += fmt.Sprintf(" and keys %v", hexKeys)
}
return NewOperator(desc, brief, region.GetID(), region.GetRegionEpoch(), kind|OpSplit, step)
return NewOperator(desc, brief, region.GetID(), region.GetRegionEpoch(), kind|OpSplit, step), nil
}

// CreateMergeRegionOperator creates an operator that merge two region into one.
func CreateMergeRegionOperator(desc string, cluster opt.Cluster, source *core.RegionInfo, target *core.RegionInfo, kind OpKind) ([]*Operator, error) {
if core.IsInJointState(source.GetPeers()...) || core.IsInJointState(target.GetPeers()...) {
return nil, errors.Errorf("cannot merge regions which are in joint state")
}

var steps []OpStep
if !isRegionMatch(source, target) {
peers := make(map[uint64]*metapb.Peer)
Expand Down
224 changes: 224 additions & 0 deletions server/schedule/operator/create_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"

"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/server/config"
Expand Down Expand Up @@ -53,6 +54,229 @@ func (s *testCreateOperatorSuite) SetUpTest(c *C) {
s.cluster.AddLabelsStore(10, 0, map[string]string{"zone": "z3", "host": "h1", "noleader": "true"})
}

func (s *testCreateOperatorSuite) TestCreateSplitRegionOperator(c *C) {
type testCase struct {
startKey []byte
endKey []byte
originPeers []*metapb.Peer // first is leader
policy pdpb.CheckPolicy
keys [][]byte
expectedError bool
}
cases := []testCase{
{
startKey: []byte("a"),
endKey: []byte("b"),
originPeers: []*metapb.Peer{
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter},
{Id: 3, StoreId: 3, Role: metapb.PeerRole_Voter},
},
policy: pdpb.CheckPolicy_APPROXIMATE,
expectedError: false,
},
{
startKey: []byte("c"),
endKey: []byte("d"),
originPeers: []*metapb.Peer{
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter},
{Id: 3, StoreId: 3, Role: metapb.PeerRole_Voter},
},
policy: pdpb.CheckPolicy_SCAN,
expectedError: false,
},
{
startKey: []byte("e"),
endKey: []byte("h"),
originPeers: []*metapb.Peer{
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter},
{Id: 3, StoreId: 3, Role: metapb.PeerRole_Voter},
},
policy: pdpb.CheckPolicy_USEKEY,
keys: [][]byte{[]byte("f"), []byte("g")},
expectedError: false,
},
{
startKey: []byte("i"),
endKey: []byte("j"),
originPeers: []*metapb.Peer{
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter},
{Id: 3, StoreId: 3, Role: metapb.PeerRole_IncomingVoter},
},
policy: pdpb.CheckPolicy_APPROXIMATE,
expectedError: true,
},
{
startKey: []byte("k"),
endKey: []byte("l"),
originPeers: []*metapb.Peer{
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter},
{Id: 3, StoreId: 3, Role: metapb.PeerRole_DemotingVoter},
},
policy: pdpb.CheckPolicy_APPROXIMATE,
expectedError: true,
},
}

for _, tc := range cases {
region := core.NewRegionInfo(&metapb.Region{
Id: 1,
StartKey: tc.startKey,
EndKey: tc.endKey,
Peers: tc.originPeers,
}, tc.originPeers[0])
op, err := CreateSplitRegionOperator("test", region, 0, tc.policy, tc.keys)
if tc.expectedError {
c.Assert(err, NotNil)
continue
}
c.Assert(err, IsNil)
c.Assert(op.Kind(), Equals, OpSplit)
c.Assert(len(op.steps), Equals, 1)
for i := 0; i < op.Len(); i++ {
switch step := op.Step(i).(type) {
case SplitRegion:
c.Assert(step.StartKey, DeepEquals, tc.startKey)
c.Assert(step.EndKey, DeepEquals, tc.endKey)
c.Assert(step.Policy, Equals, tc.policy)
c.Assert(step.SplitKeys, DeepEquals, tc.keys)
default:
c.Errorf("unexpected type: %s", step.String())
}
}
}
}

func (s *testCreateOperatorSuite) TestCreateMergeRegionOperator(c *C) {
type testCase struct {
sourcePeers []*metapb.Peer // first is leader
targetPeers []*metapb.Peer // first is leader
kind OpKind
expectedError bool
prepareSteps []OpStep
}
cases := []testCase{
{
[]*metapb.Peer{
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter},
},
[]*metapb.Peer{
{Id: 3, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 4, StoreId: 2, Role: metapb.PeerRole_Voter},
},
OpMerge,
false,
[]OpStep{},
},
{
[]*metapb.Peer{
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter},
},
[]*metapb.Peer{
{Id: 4, StoreId: 2, Role: metapb.PeerRole_Voter},
{Id: 3, StoreId: 3, Role: metapb.PeerRole_Voter},
},
OpMerge | OpLeader | OpRegion,
false,
[]OpStep{
AddLearner{ToStore: 3},
TransferLeader{FromStore: 1, ToStore: 2},
ChangePeerV2Enter{
PromoteLearners: []PromoteLearner{{ToStore: 3}},
DemoteVoters: []DemoteVoter{{ToStore: 1}},
},
ChangePeerV2Leave{
PromoteLearners: []PromoteLearner{{ToStore: 3}},
DemoteVoters: []DemoteVoter{{ToStore: 1}},
},
RemovePeer{FromStore: 1},
},
},
{
[]*metapb.Peer{
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 2, StoreId: 2, Role: metapb.PeerRole_DemotingVoter},
},
[]*metapb.Peer{
{Id: 3, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 4, StoreId: 2, Role: metapb.PeerRole_Voter},
},
0,
true,
nil,
},
{
[]*metapb.Peer{
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter},
},
[]*metapb.Peer{
{Id: 3, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 4, StoreId: 2, Role: metapb.PeerRole_IncomingVoter},
},
0,
true,
nil,
},
}

for _, tc := range cases {
source := core.NewRegionInfo(&metapb.Region{Id: 68, Peers: tc.sourcePeers}, tc.sourcePeers[0])
target := core.NewRegionInfo(&metapb.Region{Id: 86, Peers: tc.targetPeers}, tc.targetPeers[0])
ops, err := CreateMergeRegionOperator("test", s.cluster, source, target, 0)
if tc.expectedError {
c.Assert(err, NotNil)
continue
}
c.Assert(err, IsNil)
c.Assert(len(ops), Equals, 2)
c.Assert(ops[0].kind, Equals, tc.kind)
c.Assert(ops[0].Len(), Equals, len(tc.prepareSteps)+1)
c.Assert(ops[1].kind, Equals, tc.kind)
c.Assert(ops[1].Len(), Equals, 1)
c.Assert(ops[1].Step(0).(MergeRegion), DeepEquals, MergeRegion{source.GetMeta(), target.GetMeta(), true})

expectedSteps := append(tc.prepareSteps, MergeRegion{source.GetMeta(), target.GetMeta(), false})
for i := 0; i < ops[0].Len(); i++ {
switch step := ops[0].Step(i).(type) {
case TransferLeader:
c.Assert(step.FromStore, Equals, expectedSteps[i].(TransferLeader).FromStore)
c.Assert(step.ToStore, Equals, expectedSteps[i].(TransferLeader).ToStore)
case AddLearner:
c.Assert(step.ToStore, Equals, expectedSteps[i].(AddLearner).ToStore)
case RemovePeer:
c.Assert(step.FromStore, Equals, expectedSteps[i].(RemovePeer).FromStore)
case ChangePeerV2Enter:
c.Assert(len(step.PromoteLearners), Equals, len(expectedSteps[i].(ChangePeerV2Enter).PromoteLearners))
c.Assert(len(step.DemoteVoters), Equals, len(expectedSteps[i].(ChangePeerV2Enter).DemoteVoters))
for j, p := range expectedSteps[i].(ChangePeerV2Enter).PromoteLearners {
c.Assert(step.PromoteLearners[j].ToStore, Equals, p.ToStore)
}
for j, d := range expectedSteps[i].(ChangePeerV2Enter).DemoteVoters {
c.Assert(step.DemoteVoters[j].ToStore, Equals, d.ToStore)
}
case ChangePeerV2Leave:
c.Assert(len(step.PromoteLearners), Equals, len(expectedSteps[i].(ChangePeerV2Leave).PromoteLearners))
c.Assert(len(step.DemoteVoters), Equals, len(expectedSteps[i].(ChangePeerV2Leave).DemoteVoters))
for j, p := range expectedSteps[i].(ChangePeerV2Leave).PromoteLearners {
c.Assert(step.PromoteLearners[j].ToStore, Equals, p.ToStore)
}
for j, d := range expectedSteps[i].(ChangePeerV2Leave).DemoteVoters {
c.Assert(step.DemoteVoters[j].ToStore, Equals, d.ToStore)
}
case MergeRegion:
c.Assert(step, DeepEquals, expectedSteps[i].(MergeRegion))
}
}
}
}

func (s *testCreateOperatorSuite) TestCreateLeaveJointStateOperator(c *C) {
type testCase struct {
originPeers []*metapb.Peer // first is leader
Expand Down
6 changes: 5 additions & 1 deletion server/schedule/region_splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,11 @@ type splitRegionsHandler struct {
}

func (h *splitRegionsHandler) SplitRegionByKeys(region *core.RegionInfo, splitKeys [][]byte) error {
op := operator.CreateSplitRegionOperator("region-splitter", region, 0, pdpb.CheckPolicy_USEKEY, splitKeys)
op, err := operator.CreateSplitRegionOperator("region-splitter", region, 0, pdpb.CheckPolicy_USEKEY, splitKeys)
if err != nil {
return err
}

if ok := h.oc.AddOperator(op); !ok {
log.Warn("add region split operator failed", zap.Uint64("region-id", region.GetID()))
return errors.New("add region split operator failed")
Expand Down

0 comments on commit 3f28c56

Please sign in to comment.