diff --git a/server/schedule/checker/replica_strategy.go b/server/schedule/checker/replica_strategy.go new file mode 100644 index 00000000000..ef3159975cf --- /dev/null +++ b/server/schedule/checker/replica_strategy.go @@ -0,0 +1,132 @@ +// Copyright 2020 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package checker + +import ( + "github.com/pingcap/log" + "github.com/tikv/pd/server/core" + "github.com/tikv/pd/server/schedule/filter" + "github.com/tikv/pd/server/schedule/opt" + "go.uber.org/zap" +) + +// ReplicaStrategy collects some utilities to manipulate region peers. It +// exists to allow replica_checker and rule_checker to reuse common logics. +type ReplicaStrategy struct { + checkerName string // replica-checker / rule-checker + cluster opt.Cluster + locationLabels []string + isolationLevel string + region *core.RegionInfo + extraFilters []filter.Filter +} + +// SelectStoreToAdd returns the store to add a replica to a region. +// `coLocationStores` are the stores used to compare location with target +// store. +// `extraFilters` is used to set up more filters based on the context that +// calling this method. +// +// For example, to select a target store to replace a region's peer, we can use +// the peer list with the peer removed as `coLocationStores`. +// Meanwhile, we need to provide more constraints to ensure that the isolation +// level cannot be reduced after replacement. +func (s *ReplicaStrategy) SelectStoreToAdd(coLocationStores []*core.StoreInfo, extraFilters ...filter.Filter) uint64 { + // The selection process uses a two-stage fashion. The first stage + // ignores the temporary state of the stores and selects the stores + // with the highest score according to the location label. The second + // stage considers all temporary states and capacity factors to select + // the most suitable target. + // + // The reason for it is to prevent the non-optimal replica placement due + // to the short-term state, resulting in redundant scheduling. + filters := []filter.Filter{ + filter.NewExcludedFilter(s.checkerName, nil, s.region.GetStoreIds()), + filter.NewStorageThresholdFilter(s.checkerName), + filter.NewSpecialUseFilter(s.checkerName), + &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, AllowTemporaryStates: true}, + } + if len(s.locationLabels) > 0 && s.isolationLevel != "" { + filters = append(filters, filter.NewIsolationFilter(s.checkerName, s.isolationLevel, s.locationLabels, coLocationStores)) + } + if len(extraFilters) > 0 { + filters = append(filters, extraFilters...) + } + if len(s.extraFilters) > 0 { + filters = append(filters, s.extraFilters...) + } + + isolationComparer := filter.IsolationComparer(s.locationLabels, coLocationStores) + strictStateFilter := &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true} + target := filter.NewCandidates(s.cluster.GetStores()). + FilterTarget(s.cluster.GetOpts(), filters...). + Sort(isolationComparer).Reverse().Top(isolationComparer). // greater isolation score is better + Sort(filter.RegionScoreComparer(s.cluster.GetOpts())). // less region score is better + FilterTarget(s.cluster.GetOpts(), strictStateFilter).PickFirst() // the filter does not ignore temp states + if target == nil { + return 0 + } + return target.GetID() +} + +// SelectStoreToFix returns a store to replace down/offline old peer. The location +// placement after scheduling is allowed to be worse than original. +func (s *ReplicaStrategy) SelectStoreToFix(coLocationStores []*core.StoreInfo, old uint64) uint64 { + // trick to avoid creating a slice with `old` removed. + s.swapStoreToFirst(coLocationStores, old) + return s.SelectStoreToAdd(coLocationStores[1:]) +} + +// SelectStoreToImprove returns a store to replace oldStore. The location +// placement after scheduling should be better than original. +func (s *ReplicaStrategy) SelectStoreToImprove(coLocationStores []*core.StoreInfo, old uint64) uint64 { + // trick to avoid creating a slice with `old` removed. + s.swapStoreToFirst(coLocationStores, old) + oldStore := s.cluster.GetStore(old) + if oldStore == nil { + return 0 + } + filters := []filter.Filter{ + filter.NewLocationImprover(s.checkerName, s.locationLabels, coLocationStores, oldStore), + } + if len(s.locationLabels) > 0 && s.isolationLevel != "" { + filters = append(filters, filter.NewIsolationFilter(s.checkerName, s.isolationLevel, s.locationLabels, coLocationStores[1:])) + } + return s.SelectStoreToAdd(coLocationStores[1:], filters...) +} + +func (s *ReplicaStrategy) swapStoreToFirst(stores []*core.StoreInfo, id uint64) { + for i, s := range stores { + if s.GetID() == id { + stores[0], stores[i] = stores[i], stores[0] + return + } + } +} + +// SelectStoreToRemove returns the best option to remove from the region. +func (s *ReplicaStrategy) SelectStoreToRemove(coLocationStores []*core.StoreInfo) uint64 { + isolationComparer := filter.IsolationComparer(s.locationLabels, coLocationStores) + source := filter.NewCandidates(coLocationStores). + FilterSource(s.cluster.GetOpts(), &filter.StoreStateFilter{ActionScope: replicaCheckerName, MoveRegion: true}). + Sort(isolationComparer).Top(isolationComparer). + Sort(filter.RegionScoreComparer(s.cluster.GetOpts())).Reverse(). + PickFirst() + if source == nil { + log.Debug("no removable store", zap.Uint64("region-id", s.region.GetID())) + return 0 + } + return source.GetID() +} diff --git a/server/schedule/region_scatterer.go b/server/schedule/region_scatterer.go index ddf513d0069..3ef50bb1f02 100644 --- a/server/schedule/region_scatterer.go +++ b/server/schedule/region_scatterer.go @@ -277,7 +277,14 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) * // Group peers by the engine of their stores for _, peer := range region.GetPeers() { store := r.cluster.GetStore(peer.GetStoreId()) +<<<<<<< HEAD if ordinaryFilter.Target(r.cluster, store) { +======= + if store == nil { + return nil + } + if ordinaryFilter.Target(r.cluster.GetOpts(), store) { +>>>>>>> 63c46a1e8 (*: check if GetStore returns nil (#4347)) ordinaryPeers[peer.GetId()] = peer } else { engine := store.GetLabelValue(filter.EngineKey) @@ -415,7 +422,14 @@ func (r *RegionScatterer) selectAvailableLeaderStores(group string, peers map[ui leaderCandidateStores := make([]uint64, 0) for storeID := range peers { store := r.cluster.GetStore(storeID) +<<<<<<< HEAD engine := store.GetLabelValue(filter.EngineKey) +======= + if store == nil { + return 0 + } + engine := store.GetLabelValue(core.EngineKey) +>>>>>>> 63c46a1e8 (*: check if GetStore returns nil (#4347)) if len(engine) < 1 { leaderCandidateStores = append(leaderCandidateStores, storeID) } @@ -439,7 +453,14 @@ func (r *RegionScatterer) Put(peers map[uint64]*metapb.Peer, leaderStoreID uint6 for _, peer := range peers { storeID := peer.GetStoreId() store := r.cluster.GetStore(storeID) +<<<<<<< HEAD if ordinaryFilter.Target(r.cluster, store) { +======= + if store == nil { + continue + } + if ordinaryFilter.Target(r.cluster.GetOpts(), store) { +>>>>>>> 63c46a1e8 (*: check if GetStore returns nil (#4347)) r.ordinaryEngine.selectedPeer.Put(storeID, group) scatterDistributionCounter.WithLabelValues( fmt.Sprintf("%v", storeID), diff --git a/server/schedulers/shuffle_region.go b/server/schedulers/shuffle_region.go index 334caccefda..2f963d93a18 100644 --- a/server/schedulers/shuffle_region.go +++ b/server/schedulers/shuffle_region.go @@ -159,12 +159,20 @@ func (s *shuffleRegionScheduler) scheduleRemovePeer(cluster opt.Cluster) (*core. } func (s *shuffleRegionScheduler) scheduleAddPeer(cluster opt.Cluster, region *core.RegionInfo, oldPeer *metapb.Peer) *metapb.Peer { +<<<<<<< HEAD var scoreGuard filter.Filter if cluster.IsPlacementRulesEnabled() { scoreGuard = filter.NewRuleFitFilter(s.GetName(), cluster, region, oldPeer.GetStoreId()) } else { scoreGuard = filter.NewDistinctScoreFilter(s.GetName(), cluster.GetLocationLabels(), cluster.GetRegionStores(region), cluster.GetStore(oldPeer.GetStoreId())) } +======= + store := cluster.GetStore(oldPeer.GetStoreId()) + if store == nil { + return nil + } + scoreGuard := filter.NewPlacementSafeguard(s.GetName(), cluster, region, store) +>>>>>>> 63c46a1e8 (*: check if GetStore returns nil (#4347)) excludedFilter := filter.NewExcludedFilter(s.GetName(), nil, region.GetStoreIds()) stores := cluster.GetStores() diff --git a/server/statistics/store.go b/server/statistics/store.go index 03899283e17..5e6c8d0c48b 100644 --- a/server/statistics/store.go +++ b/server/statistics/store.go @@ -293,7 +293,12 @@ func (s *StoresStats) FilterUnhealthyStore(cluster core.StoreSetInformer) { s.Lock() defer s.Unlock() for storeID := range s.rollingStoresStats { +<<<<<<< HEAD if s.storeIsUnhealthy(cluster, storeID) { +======= + store := cluster.GetStore(storeID) + if store == nil || store.IsTombstone() || store.IsUnhealthy() || store.IsPhysicallyDestroyed() { +>>>>>>> 63c46a1e8 (*: check if GetStore returns nil (#4347)) delete(s.rollingStoresStats, storeID) } } diff --git a/server/statistics/store_test.go b/server/statistics/store_test.go new file mode 100644 index 00000000000..e3247ea1c46 --- /dev/null +++ b/server/statistics/store_test.go @@ -0,0 +1,48 @@ +// Copyright 2021 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +import ( + "time" + + . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/tikv/pd/server/core" +) + +var _ = Suite(&testStoreSuite{}) + +type testStoreSuite struct{} + +func (s *testStoreSuite) TestFilterUnhealtyStore(c *C) { + stats := NewStoresStats() + cluster := core.NewBasicCluster() + for i := uint64(1); i <= 5; i++ { + cluster.PutStore(core.NewStoreInfo(&metapb.Store{Id: i}, core.SetLastHeartbeatTS(time.Now()))) + stats.Observe(i, &pdpb.StoreStats{}) + } + c.Assert(stats.GetStoresLoads(), HasLen, 5) + + cluster.PutStore(cluster.GetStore(1).Clone(core.SetLastHeartbeatTS(time.Now().Add(-24 * time.Hour)))) + cluster.PutStore(cluster.GetStore(2).Clone(core.TombstoneStore())) + cluster.DeleteStore(cluster.GetStore(3)) + + stats.FilterUnhealthyStore(cluster) + loads := stats.GetStoresLoads() + c.Assert(loads, HasLen, 2) + c.Assert(loads[4], NotNil) + c.Assert(loads[5], NotNil) +}