Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rule_checker: can replace unhealthPeer with orphanPeer #6831

Merged
merged 10 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
rule_checker: can replace unhealthPeer with orphanPeer
Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch committed Jul 21, 2023
commit cf1d5accdacb8e7210836e242ed00cc4a42645cf
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ static: install-tools
@ echo "gofmt ..."
@ gofmt -s -l -d $(PACKAGE_DIRECTORIES) 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }'
@ echo "golangci-lint ..."
@ golangci-lint run --verbose $(PACKAGE_DIRECTORIES)
@ golangci-lint run --verbose $(PACKAGE_DIRECTORIES) --allow-parallel-runners
@ echo "revive ..."
@ revive -formatter friendly -config revive.toml $(PACKAGES)

Expand Down
2 changes: 1 addition & 1 deletion client/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ static: install-tools
@ echo "gofmt ..."
@ gofmt -s -l -d . 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }'
@ echo "golangci-lint ..."
@ golangci-lint run -c ../.golangci.yml --verbose ./...
@ golangci-lint run -c ../.golangci.yml --verbose ./... --allow-parallel-runners
@ echo "revive ..."
@ revive -formatter friendly -config ../revive.toml ./...

Expand Down
50 changes: 46 additions & 4 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ var (
ruleCheckerMoveToBetterLocationCounter = checkerCounter.WithLabelValues(ruleChecker, "move-to-better-location")
ruleCheckerSkipRemoveOrphanPeerCounter = checkerCounter.WithLabelValues(ruleChecker, "skip-remove-orphan-peer")
ruleCheckerRemoveOrphanPeerCounter = checkerCounter.WithLabelValues(ruleChecker, "remove-orphan-peer")
ruleCheckerReplaceOrphanPeerCounter = checkerCounter.WithLabelValues(ruleChecker, "replace-orphan-peer")
)

// RuleChecker fix/improve region by placement rules.
Expand Down Expand Up @@ -426,14 +427,16 @@ func (c *RuleChecker) fixOrphanPeers(region *core.RegionInfo, fit *placement.Reg
if len(fit.OrphanPeers) == 0 {
return nil, nil
}
var pinDownPeer *pdpb.PeerStats
isUnhealthyPeer := func(id uint64) bool {
for _, pendingPeer := range region.GetPendingPeers() {
if pendingPeer.GetId() == id {
for _, downPeer := range region.GetDownPeers() {
if downPeer.Peer.GetId() == id {
pinDownPeer = downPeer
return true
}
}
for _, downPeer := range region.GetDownPeers() {
if downPeer.Peer.GetId() == id {
for _, pendingPeer := range region.GetPendingPeers() {
if pendingPeer.GetId() == id {
return true
}
}
Expand All @@ -455,11 +458,46 @@ loopFits:
}
}
}

// If hasUnhealthyFit is false, it is safe to delete the OrphanPeer.
if !hasUnhealthyFit {
ruleCheckerRemoveOrphanPeerCounter.Inc()
return operator.CreateRemovePeerOperator("remove-orphan-peer", c.cluster, 0, region, fit.OrphanPeers[0].StoreId)
}

// try to use orphan peers to replace unhealthy down peers.
for _, orphanPeer := range fit.OrphanPeers {
if isUnhealthyPeer(orphanPeer.GetId()) {
continue
}
if pinDownPeer != nil {
nolouch marked this conversation as resolved.
Show resolved Hide resolved
// no consider witness in this path.
if pinDownPeer.GetPeer().GetIsWitness() || orphanPeer.GetIsWitness() {
continue
}
// store should be down.
if !c.isStoreDownTimeHitMaxDownTime(pinDownPeer.GetPeer().GetStoreId()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do nothing if the store-id is not exist.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will return false when calling isStoreDownTimeHitMaxDownTime.

continue
}
// check if down peer can replace with orphan peer.
ruleCheckerReplaceOrphanPeerCounter.Inc()
dstStore := c.cluster.GetStore(orphanPeer.GetStoreId())
if fit.Replace(pinDownPeer.GetPeer().GetStoreId(), dstStore) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waht it will do if the orphan peer is also down peer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will skip in the before.

destRole := pinDownPeer.GetPeer().Role
Copy link
Member

@rleungx rleungx Jul 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using a variable for pinDownPeer.GetPeer() and using GetRole here?

orphanPeerRole := orphanPeer.GetRole()
switch {
case orphanPeerRole == metapb.PeerRole_Learner && destRole == metapb.PeerRole_Voter:
return operator.CreatePromoteLearnerOperatorAndRemovePeer("replace-down-peer-with-orphan-peer", c.cluster, region, orphanPeer, pinDownPeer.GetPeer())
case orphanPeerRole == metapb.PeerRole_Voter && destRole == metapb.PeerRole_Learner:
return operator.CreateDemoteLearnerOperatorAndRemovePeer("replace-down-peer-with-orphan-peer", c.cluster, region, orphanPeer, pinDownPeer.GetPeer())
default:
// destRole should not same with orphanPeerRole. if role is same, it fit with orphanPeer should be better than now.
// destRole never be leader, so we not consider it.
}
}
}
}

// If hasUnhealthyFit is true, try to remove unhealthy orphan peers only if number of OrphanPeers is >= 2.
// Ref https://github.com/tikv/pd/issues/4045
if len(fit.OrphanPeers) >= 2 {
Expand Down Expand Up @@ -498,6 +536,10 @@ func (c *RuleChecker) isDownPeer(region *core.RegionInfo, peer *metapb.Peer) boo

func (c *RuleChecker) isStoreDownTimeHitMaxDownTime(storeID uint64) bool {
store := c.cluster.GetStore(storeID)
if store == nil {
log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID))
return false
}
return store.DownTime() >= c.cluster.GetCheckerConfig().GetMaxStoreDownTime()
}

Expand Down
121 changes: 120 additions & 1 deletion pkg/schedule/checker/rule_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,6 @@ func (suite *ruleCheckerTestSuite) TestFixRuleWitness() {
op := suite.rc.Check(suite.cluster.GetRegion(1))
suite.NotNil(op)
suite.Equal("add-rule-peer", op.Desc())
fmt.Println(op)
suite.Equal(uint64(3), op.Step(0).(operator.AddLearner).ToStore)
suite.True(op.Step(0).(operator.AddLearner).IsWitness)
}
Expand Down Expand Up @@ -686,6 +685,126 @@ func (suite *ruleCheckerTestSuite) TestPriorityFixOrphanPeer() {
suite.Equal("remove-orphan-peer", op.Desc())
}

func (suite *ruleCheckerTestSuite) TestPriorityFitHealthWithDifferentRole1() {
suite.cluster.SetEnableUseJointConsensus(true)
suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"})
suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"})
suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4"})
suite.cluster.AddRegionWithLearner(1, 1, []uint64{2, 3}, []uint64{4})
r1 := suite.cluster.GetRegion(1)
suite.cluster.GetStore(3).GetMeta().LastHeartbeat = time.Now().Add(-31 * time.Minute).UnixNano()

// set peer3 to pending and down
r1 = r1.Clone(core.WithPendingPeers([]*metapb.Peer{r1.GetPeer(3)}))
r1 = r1.Clone(core.WithDownPeers([]*pdpb.PeerStats{
{
Peer: r1.GetStorePeer(3),
DownSeconds: 30000,
},
}))
suite.cluster.PutRegion(r1)

op := suite.rc.Check(suite.cluster.GetRegion(1))
suite.Equal(uint64(3), op.Step(0).(operator.ChangePeerV2Enter).DemoteVoters[0].ToStore)
suite.Equal(uint64(4), op.Step(0).(operator.ChangePeerV2Enter).PromoteLearners[0].ToStore)
suite.Equal(uint64(3), op.Step(1).(operator.ChangePeerV2Leave).DemoteVoters[0].ToStore)
suite.Equal(uint64(4), op.Step(1).(operator.ChangePeerV2Leave).PromoteLearners[0].ToStore)
suite.Equal("replace-down-peer-with-orphan-peer", op.Desc())
}

func (suite *ruleCheckerTestSuite) TestPriorityFitHealthWithDifferentRole2() {
suite.cluster.SetEnableUseJointConsensus(true)
suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"})
suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"})
suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4"})
suite.cluster.AddLabelsStore(5, 1, map[string]string{"host": "host5"})
suite.cluster.AddLeaderRegion(1, 1, 2, 3, 4, 5)
r1 := suite.cluster.GetRegion(1)

// set peer3 to pending and down, and peer 3 to learner, and store 3 is down
suite.cluster.GetStore(3).GetMeta().LastHeartbeat = time.Now().Add(-31 * time.Minute).UnixNano()
r1 = r1.Clone(core.WithLearners([]*metapb.Peer{r1.GetPeer(3)}))
r1 = r1.Clone(
core.WithPendingPeers([]*metapb.Peer{r1.GetPeer(3)}),
core.WithDownPeers([]*pdpb.PeerStats{
{
Peer: r1.GetStorePeer(3),
DownSeconds: 30000,
},
}),
)
suite.cluster.PutRegion(r1)

// default and test group => 3 voter + 1 learner
err := suite.ruleManager.SetRule(&placement.Rule{
GroupID: "test",
ID: "10",
Role: placement.Learner,
Count: 1,
})
suite.NoError(err)

op := suite.rc.Check(suite.cluster.GetRegion(1))
suite.Equal(uint64(5), op.Step(0).(operator.ChangePeerV2Enter).DemoteVoters[0].ToStore)
suite.Equal(uint64(3), op.Step(1).(operator.RemovePeer).FromStore)
suite.Equal("replace-down-peer-with-orphan-peer", op.Desc())
}

func (suite *ruleCheckerTestSuite) TestPriorityFitHealthPeersAndTiFlash() {
suite.cluster.SetEnableUseJointConsensus(true)
suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"})
suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"})
suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4", "engine": "tiflash"})
suite.cluster.AddRegionWithLearner(1, 1, []uint64{2, 3}, []uint64{4})
rule := &placement.Rule{
GroupID: "pd",
ID: "test",
Role: placement.Voter,
Count: 3,
}
rule2 := &placement.Rule{
GroupID: "pd",
ID: "test2",
Role: placement.Learner,
Count: 1,
LabelConstraints: []placement.LabelConstraint{
{
Key: "engine",
Op: placement.In,
Values: []string{"tiflash"},
},
},
}
suite.ruleManager.SetRule(rule)
suite.ruleManager.SetRule(rule2)
suite.ruleManager.DeleteRule("pd", "default")

r1 := suite.cluster.GetRegion(1)
// set peer3 to pending and down
r1 = r1.Clone(core.WithPendingPeers([]*metapb.Peer{r1.GetPeer(3)}))
r1 = r1.Clone(core.WithDownPeers([]*pdpb.PeerStats{
{
Peer: r1.GetStorePeer(3),
DownSeconds: 30000,
},
}))
suite.cluster.PutRegion(r1)
suite.cluster.GetStore(3).GetMeta().LastHeartbeat = time.Now().Add(-31 * time.Minute).UnixNano()

op := suite.rc.Check(suite.cluster.GetRegion(1))
// should not promote tiflash peer
suite.Nil(op)

// scale a node, can replace the down peer
suite.cluster.AddLabelsStore(5, 1, map[string]string{"host": "host5"})
op = suite.rc.Check(suite.cluster.GetRegion(1))
suite.NotNil(op)
suite.Equal("fast-replace-rule-down-peer", op.Desc())
}

func (suite *ruleCheckerTestSuite) TestIssue3293() {
suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host1"})
Expand Down
5 changes: 4 additions & 1 deletion pkg/schedule/operator/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,6 @@ func (b *Builder) Build(kind OpKind) (*Operator, error) {
if brief, b.err = b.prepareBuild(); b.err != nil {
return nil, b.err
}

if b.useJointConsensus {
kind, b.err = b.buildStepsWithJointConsensus(kind)
} else {
Expand Down Expand Up @@ -539,6 +538,10 @@ func (b *Builder) brief() string {
return fmt.Sprintf("%s: store %s to %s", op, b.toRemove, b.toAdd)
case len(b.toAdd) > 0:
return fmt.Sprintf("add peer: store %s", b.toAdd)
case len(b.toRemove) > 0 && len(b.toPromote) > 0:
return fmt.Sprintf("promote peer: store %s, rm peer: store %s", b.toRemove, b.toPromote)
case len(b.toRemove) > 0 && len(b.toDemote) > 0:
return fmt.Sprintf("demote peer: store %s, rm peer: store %s", b.toDemote, b.toRemove)
case len(b.toRemove) > 0:
return fmt.Sprintf("rm peer: store %s", b.toRemove)
case len(b.toPromote) > 0:
Expand Down
19 changes: 19 additions & 0 deletions pkg/schedule/operator/create_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,25 @@ func CreatePromoteLearnerOperator(desc string, ci sche.SharedCluster, region *co
Build(0)
}

// CreatePromoteLearnerOperatorAndRemovePeer creates an operator that promotes a learner and removes a peer.
func CreatePromoteLearnerOperatorAndRemovePeer(desc string, ci sche.SharedCluster, region *core.RegionInfo, toPromote *metapb.Peer, toRemove *metapb.Peer) (*Operator, error) {
return NewBuilder(desc, ci, region).
PromoteLearner(toPromote.GetStoreId()).
RemovePeer(toRemove.GetStoreId()).
Build(0)
}

// CreateDemoteLearnerOperatorAndRemovePeer creates an operator that demotes a learner and removes a peer.
func CreateDemoteLearnerOperatorAndRemovePeer(desc string, ci sche.SharedCluster, region *core.RegionInfo, toDemote *metapb.Peer, toRemove *metapb.Peer) (*Operator, error) {
if !ci.GetSharedConfig().IsUseJointConsensus() {
return nil, errors.Errorf("cannot build demote learner operator for region which is not in joint state")
nolouch marked this conversation as resolved.
Show resolved Hide resolved
}
return NewBuilder(desc, ci, region).
DemoteVoter(toDemote.GetStoreId()).
RemovePeer(toRemove.GetStoreId()).
Build(0)
}

// CreateRemovePeerOperator creates an operator that removes a peer from region.
func CreateRemovePeerOperator(desc string, ci sche.SharedCluster, kind OpKind, region *core.RegionInfo, storeID uint64) (*Operator, error) {
return NewBuilder(desc, ci, region).
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (o *Operator) Sync(other *Operator) {
func (o *Operator) String() string {
stepStrs := make([]string, len(o.steps))
for i := range o.steps {
stepStrs[i] = o.steps[i].String()
stepStrs[i] = fmt.Sprintf("step-%d:{%s}", i, o.steps[i].String())
nolouch marked this conversation as resolved.
Show resolved Hide resolved
}
s := fmt.Sprintf("%s {%s} (kind:%s, region:%v(%v, %v), createAt:%s, startAt:%s, currentStep:%v, size:%d, steps:[%s], timeout:[%s])",
o.desc, o.brief, o.kind, o.regionID, o.regionEpoch.GetVersion(), o.regionEpoch.GetConfVer(), o.GetCreateTime(),
Expand Down
3 changes: 3 additions & 0 deletions pkg/schedule/placement/fit.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type RegionFit struct {

// Replace return true if the replacement store is fit all constraints and isolation score is not less than the origin.
func (f *RegionFit) Replace(srcStoreID uint64, dstStore *core.StoreInfo) bool {
if dstStore == nil {
return false
}
fit := f.getRuleFitByStoreID(srcStoreID)
// check the target store is fit all constraints.
if fit == nil {
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/client/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ static: install-tools
@ echo "gofmt ..."
@ gofmt -s -l -d . 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }'
@ echo "golangci-lint ..."
@ golangci-lint run -c $(ROOT_PATH)/.golangci.yml --verbose ./...
@ golangci-lint run -c $(ROOT_PATH)/.golangci.yml --verbose ./... --allow-parallel-runners
@ echo "revive ..."
@ revive -formatter friendly -config $(ROOT_PATH)/revive.toml ./...

Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/mcs/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ static: install-tools
@ echo "gofmt ..."
@ gofmt -s -l -d . 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }'
@ echo "golangci-lint ..."
@ golangci-lint run -c $(ROOT_PATH)/.golangci.yml --verbose ./...
@ golangci-lint run -c $(ROOT_PATH)/.golangci.yml --verbose ./... --allow-parallel-runners
@ echo "revive ..."
@ revive -formatter friendly -config $(ROOT_PATH)/revive.toml ./...

Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/tso/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ static: install-tools
@ echo "gofmt ..."
@ gofmt -s -l -d . 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }'
@ echo "golangci-lint ..."
@ golangci-lint run -c $(ROOT_PATH)/.golangci.yml --verbose ./...
@ golangci-lint run -c $(ROOT_PATH)/.golangci.yml --verbose ./... --allow-parallel-runners
@ echo "revive ..."
@ revive -formatter friendly -config $(ROOT_PATH)/revive.toml ./...

Expand Down
Loading