Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#4347
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
disksing authored and ti-chi-bot committed Nov 24, 2021
1 parent b2c3de6 commit df54757
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 0 deletions.
132 changes: 132 additions & 0 deletions server/schedule/checker/replica_strategy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2020 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package checker

import (
"github.com/pingcap/log"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule/filter"
"github.com/tikv/pd/server/schedule/opt"
"go.uber.org/zap"
)

// ReplicaStrategy collects some utilities to manipulate region peers. It
// exists to allow replica_checker and rule_checker to reuse common logics.
type ReplicaStrategy struct {
checkerName string // replica-checker / rule-checker
cluster opt.Cluster
locationLabels []string
isolationLevel string
region *core.RegionInfo
extraFilters []filter.Filter
}

// SelectStoreToAdd returns the store to add a replica to a region.
// `coLocationStores` are the stores used to compare location with target
// store.
// `extraFilters` is used to set up more filters based on the context that
// calling this method.
//
// For example, to select a target store to replace a region's peer, we can use
// the peer list with the peer removed as `coLocationStores`.
// Meanwhile, we need to provide more constraints to ensure that the isolation
// level cannot be reduced after replacement.
func (s *ReplicaStrategy) SelectStoreToAdd(coLocationStores []*core.StoreInfo, extraFilters ...filter.Filter) uint64 {
// The selection process uses a two-stage fashion. The first stage
// ignores the temporary state of the stores and selects the stores
// with the highest score according to the location label. The second
// stage considers all temporary states and capacity factors to select
// the most suitable target.
//
// The reason for it is to prevent the non-optimal replica placement due
// to the short-term state, resulting in redundant scheduling.
filters := []filter.Filter{
filter.NewExcludedFilter(s.checkerName, nil, s.region.GetStoreIds()),
filter.NewStorageThresholdFilter(s.checkerName),
filter.NewSpecialUseFilter(s.checkerName),
&filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, AllowTemporaryStates: true},
}
if len(s.locationLabels) > 0 && s.isolationLevel != "" {
filters = append(filters, filter.NewIsolationFilter(s.checkerName, s.isolationLevel, s.locationLabels, coLocationStores))
}
if len(extraFilters) > 0 {
filters = append(filters, extraFilters...)
}
if len(s.extraFilters) > 0 {
filters = append(filters, s.extraFilters...)
}

isolationComparer := filter.IsolationComparer(s.locationLabels, coLocationStores)
strictStateFilter := &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true}
target := filter.NewCandidates(s.cluster.GetStores()).
FilterTarget(s.cluster.GetOpts(), filters...).
Sort(isolationComparer).Reverse().Top(isolationComparer). // greater isolation score is better
Sort(filter.RegionScoreComparer(s.cluster.GetOpts())). // less region score is better
FilterTarget(s.cluster.GetOpts(), strictStateFilter).PickFirst() // the filter does not ignore temp states
if target == nil {
return 0
}
return target.GetID()
}

// SelectStoreToFix returns a store to replace down/offline old peer. The location
// placement after scheduling is allowed to be worse than original.
func (s *ReplicaStrategy) SelectStoreToFix(coLocationStores []*core.StoreInfo, old uint64) uint64 {
// trick to avoid creating a slice with `old` removed.
s.swapStoreToFirst(coLocationStores, old)
return s.SelectStoreToAdd(coLocationStores[1:])
}

// SelectStoreToImprove returns a store to replace oldStore. The location
// placement after scheduling should be better than original.
func (s *ReplicaStrategy) SelectStoreToImprove(coLocationStores []*core.StoreInfo, old uint64) uint64 {
// trick to avoid creating a slice with `old` removed.
s.swapStoreToFirst(coLocationStores, old)
oldStore := s.cluster.GetStore(old)
if oldStore == nil {
return 0
}
filters := []filter.Filter{
filter.NewLocationImprover(s.checkerName, s.locationLabels, coLocationStores, oldStore),
}
if len(s.locationLabels) > 0 && s.isolationLevel != "" {
filters = append(filters, filter.NewIsolationFilter(s.checkerName, s.isolationLevel, s.locationLabels, coLocationStores[1:]))
}
return s.SelectStoreToAdd(coLocationStores[1:], filters...)
}

func (s *ReplicaStrategy) swapStoreToFirst(stores []*core.StoreInfo, id uint64) {
for i, s := range stores {
if s.GetID() == id {
stores[0], stores[i] = stores[i], stores[0]
return
}
}
}

// SelectStoreToRemove returns the best option to remove from the region.
func (s *ReplicaStrategy) SelectStoreToRemove(coLocationStores []*core.StoreInfo) uint64 {
isolationComparer := filter.IsolationComparer(s.locationLabels, coLocationStores)
source := filter.NewCandidates(coLocationStores).
FilterSource(s.cluster.GetOpts(), &filter.StoreStateFilter{ActionScope: replicaCheckerName, MoveRegion: true}).
Sort(isolationComparer).Top(isolationComparer).
Sort(filter.RegionScoreComparer(s.cluster.GetOpts())).Reverse().
PickFirst()
if source == nil {
log.Debug("no removable store", zap.Uint64("region-id", s.region.GetID()))
return 0
}
return source.GetID()
}
21 changes: 21 additions & 0 deletions server/schedule/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,14 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
// Group peers by the engine of their stores
for _, peer := range region.GetPeers() {
store := r.cluster.GetStore(peer.GetStoreId())
<<<<<<< HEAD
if ordinaryFilter.Target(r.cluster, store) {
=======
if store == nil {
return nil
}
if ordinaryFilter.Target(r.cluster.GetOpts(), store) {
>>>>>>> 63c46a1e8 (*: check if GetStore returns nil (#4347))
ordinaryPeers[peer.GetId()] = peer
} else {
engine := store.GetLabelValue(filter.EngineKey)
Expand Down Expand Up @@ -415,7 +422,14 @@ func (r *RegionScatterer) selectAvailableLeaderStores(group string, peers map[ui
leaderCandidateStores := make([]uint64, 0)
for storeID := range peers {
store := r.cluster.GetStore(storeID)
<<<<<<< HEAD
engine := store.GetLabelValue(filter.EngineKey)
=======
if store == nil {
return 0
}
engine := store.GetLabelValue(core.EngineKey)
>>>>>>> 63c46a1e8 (*: check if GetStore returns nil (#4347))
if len(engine) < 1 {
leaderCandidateStores = append(leaderCandidateStores, storeID)
}
Expand All @@ -439,7 +453,14 @@ func (r *RegionScatterer) Put(peers map[uint64]*metapb.Peer, leaderStoreID uint6
for _, peer := range peers {
storeID := peer.GetStoreId()
store := r.cluster.GetStore(storeID)
<<<<<<< HEAD
if ordinaryFilter.Target(r.cluster, store) {
=======
if store == nil {
continue
}
if ordinaryFilter.Target(r.cluster.GetOpts(), store) {
>>>>>>> 63c46a1e8 (*: check if GetStore returns nil (#4347))
r.ordinaryEngine.selectedPeer.Put(storeID, group)
scatterDistributionCounter.WithLabelValues(
fmt.Sprintf("%v", storeID),
Expand Down
8 changes: 8 additions & 0 deletions server/schedulers/shuffle_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,20 @@ func (s *shuffleRegionScheduler) scheduleRemovePeer(cluster opt.Cluster) (*core.
}

func (s *shuffleRegionScheduler) scheduleAddPeer(cluster opt.Cluster, region *core.RegionInfo, oldPeer *metapb.Peer) *metapb.Peer {
<<<<<<< HEAD
var scoreGuard filter.Filter
if cluster.IsPlacementRulesEnabled() {
scoreGuard = filter.NewRuleFitFilter(s.GetName(), cluster, region, oldPeer.GetStoreId())
} else {
scoreGuard = filter.NewDistinctScoreFilter(s.GetName(), cluster.GetLocationLabels(), cluster.GetRegionStores(region), cluster.GetStore(oldPeer.GetStoreId()))
}
=======
store := cluster.GetStore(oldPeer.GetStoreId())
if store == nil {
return nil
}
scoreGuard := filter.NewPlacementSafeguard(s.GetName(), cluster, region, store)
>>>>>>> 63c46a1e8 (*: check if GetStore returns nil (#4347))
excludedFilter := filter.NewExcludedFilter(s.GetName(), nil, region.GetStoreIds())

stores := cluster.GetStores()
Expand Down
5 changes: 5 additions & 0 deletions server/statistics/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,12 @@ func (s *StoresStats) FilterUnhealthyStore(cluster core.StoreSetInformer) {
s.Lock()
defer s.Unlock()
for storeID := range s.rollingStoresStats {
<<<<<<< HEAD
if s.storeIsUnhealthy(cluster, storeID) {
=======
store := cluster.GetStore(storeID)
if store == nil || store.IsTombstone() || store.IsUnhealthy() || store.IsPhysicallyDestroyed() {
>>>>>>> 63c46a1e8 (*: check if GetStore returns nil (#4347))
delete(s.rollingStoresStats, storeID)
}
}
Expand Down
48 changes: 48 additions & 0 deletions server/statistics/store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2021 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package statistics

import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/pd/server/core"
)

var _ = Suite(&testStoreSuite{})

type testStoreSuite struct{}

func (s *testStoreSuite) TestFilterUnhealtyStore(c *C) {
stats := NewStoresStats()
cluster := core.NewBasicCluster()
for i := uint64(1); i <= 5; i++ {
cluster.PutStore(core.NewStoreInfo(&metapb.Store{Id: i}, core.SetLastHeartbeatTS(time.Now())))
stats.Observe(i, &pdpb.StoreStats{})
}
c.Assert(stats.GetStoresLoads(), HasLen, 5)

cluster.PutStore(cluster.GetStore(1).Clone(core.SetLastHeartbeatTS(time.Now().Add(-24 * time.Hour))))
cluster.PutStore(cluster.GetStore(2).Clone(core.TombstoneStore()))
cluster.DeleteStore(cluster.GetStore(3))

stats.FilterUnhealthyStore(cluster)
loads := stats.GetStoresLoads()
c.Assert(loads, HasLen, 2)
c.Assert(loads[4], NotNil)
c.Assert(loads[5], NotNil)
}

0 comments on commit df54757

Please sign in to comment.