Skip to content

Commit

Permalink
Merge branch 'master' into support_watch_delete_prev
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Sep 22, 2023
2 parents 06470c6 + 301b917 commit f9600a8
Show file tree
Hide file tree
Showing 23 changed files with 145 additions and 345 deletions.
40 changes: 17 additions & 23 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,14 +682,9 @@ func (r *RegionInfo) isRegionRecreated() bool {
return r.GetRegionEpoch().GetVersion() == 1 && r.GetRegionEpoch().GetConfVer() == 1 && (len(r.GetStartKey()) != 0 || len(r.GetEndKey()) != 0)
}

// RegionChanged is a struct that records the changes of the region.
type RegionChanged struct {
IsNew, SaveKV, SaveCache, NeedSync bool
}

// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(region, origin *RegionInfo) *RegionChanged
type RegionGuideFunc func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool)

// GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function.
// nil means do not print the log.
Expand All @@ -702,19 +697,18 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark IsNew if the region in cache does not have leader.
return func(region, origin *RegionInfo) (changed *RegionChanged) {
changed = &RegionChanged{}
// Mark isNew if the region in cache does not have leader.
return func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool) {
if origin == nil {
if log.GetLevel() <= zap.DebugLevel {
debug("insert new region",
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta())))
}
changed.SaveKV, changed.SaveCache, changed.IsNew = true, true, true
saveKV, saveCache, isNew = true, true, true
} else {
if !origin.IsFromHeartbeat() {
changed.IsNew = true
isNew = true
}
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
Expand All @@ -727,7 +721,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("new-version", r.GetVersion()),
)
}
changed.SaveKV, changed.SaveCache = true, true
saveKV, saveCache = true, true
}
if r.GetConfVer() > o.GetConfVer() {
if log.GetLevel() <= zap.InfoLevel {
Expand All @@ -738,11 +732,11 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("new-confver", r.GetConfVer()),
)
}
changed.SaveCache, changed.SaveKV = true, true
saveKV, saveCache = true, true
}
if region.GetLeader().GetId() != origin.GetLeader().GetId() {
if origin.GetLeader().GetId() == 0 {
changed.IsNew = true
isNew = true
} else if log.GetLevel() <= zap.InfoLevel {
info("leader changed",
zap.Uint64("region-id", region.GetID()),
Expand All @@ -751,57 +745,57 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
)
}
// We check it first and do not return because the log is important for us to investigate,
changed.SaveCache, changed.NeedSync = true, true
saveCache, needSync = true, true
}
if len(region.GetPeers()) != len(origin.GetPeers()) {
changed.SaveCache, changed.SaveKV = true, true
saveKV, saveCache = true, true
return
}
if len(region.GetBuckets().GetKeys()) != len(origin.GetBuckets().GetKeys()) {
if log.GetLevel() <= zap.DebugLevel {
debug("bucket key changed", zap.Uint64("region-id", region.GetID()))
}
changed.SaveCache, changed.SaveKV = true, true
saveKV, saveCache = true, true
return
}
// Once flow has changed, will update the cache.
// Because keys and bytes are strongly related, only bytes are judged.
if region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() ||
region.GetRoundBytesRead() != origin.GetRoundBytesRead() ||
region.flowRoundDivisor < origin.flowRoundDivisor {
changed.SaveCache, changed.NeedSync = true, true
saveCache, needSync = true, true
return
}
if !SortedPeersStatsEqual(region.GetDownPeers(), origin.GetDownPeers()) {
if log.GetLevel() <= zap.DebugLevel {
debug("down-peers changed", zap.Uint64("region-id", region.GetID()))
}
changed.SaveCache, changed.NeedSync = true, true
saveCache, needSync = true, true
return
}
if !SortedPeersEqual(region.GetPendingPeers(), origin.GetPendingPeers()) {
if log.GetLevel() <= zap.DebugLevel {
debug("pending-peers changed", zap.Uint64("region-id", region.GetID()))
}
changed.SaveCache, changed.NeedSync = true, true
saveCache, needSync = true, true
return
}
if region.GetApproximateSize() != origin.GetApproximateSize() ||
region.GetApproximateKeys() != origin.GetApproximateKeys() {
changed.SaveCache = true
saveCache = true
return
}
if region.GetReplicationStatus().GetState() != replication_modepb.RegionReplicationState_UNKNOWN &&
(region.GetReplicationStatus().GetState() != origin.GetReplicationStatus().GetState() ||
region.GetReplicationStatus().GetStateId() != origin.GetReplicationStatus().GetStateId()) {
changed.SaveCache = true
saveCache = true
return
}
// Do not save to kv, because 1) flashback will be eventually set to
// false, 2) flashback changes almost all regions in a cluster.
// Saving kv may downgrade PD performance when there are many regions.
if region.IsFlashbackChanged(origin) {
changed.SaveCache = true
saveCache = true
return
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ func TestNeedSync(t *testing.T) {
for _, testCase := range testCases {
regionA := region.Clone(testCase.optionsA...)
regionB := region.Clone(testCase.optionsB...)
changed := RegionGuide(regionA, regionB)
re.Equal(testCase.needSync, changed.NeedSync)
_, _, _, needSync := RegionGuide(regionA, regionB)
re.Equal(testCase.needSync, needSync)
}
}

Expand Down
39 changes: 16 additions & 23 deletions pkg/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ const (
initialMinSpace = 8 * units.GiB // 2^33=8GB
slowStoreThreshold = 80
awakenStoreInterval = 10 * time.Minute // 2 * slowScoreRecoveryTime
splitStoreWait = time.Minute

// EngineKey is the label key used to indicate engine.
EngineKey = "engine"
Expand All @@ -51,23 +50,22 @@ const (
type StoreInfo struct {
meta *metapb.Store
*storeStats
pauseLeaderTransfer bool // not allow to be used as source or target of transfer leader
slowStoreEvicted bool // this store has been evicted as a slow store, should not transfer leader to it
slowTrendEvicted bool // this store has been evicted as a slow store by trend, should not transfer leader to it
leaderCount int
regionCount int
learnerCount int
witnessCount int
leaderSize int64
regionSize int64
pendingPeerCount int
lastPersistTime time.Time
leaderWeight float64
regionWeight float64
limiter storelimit.StoreLimit
minResolvedTS uint64
lastAwakenTime time.Time
recentlySplitRegionsTime time.Time
pauseLeaderTransfer bool // not allow to be used as source or target of transfer leader
slowStoreEvicted bool // this store has been evicted as a slow store, should not transfer leader to it
slowTrendEvicted bool // this store has been evicted as a slow store by trend, should not transfer leader to it
leaderCount int
regionCount int
learnerCount int
witnessCount int
leaderSize int64
regionSize int64
pendingPeerCount int
lastPersistTime time.Time
leaderWeight float64
regionWeight float64
limiter storelimit.StoreLimit
minResolvedTS uint64
lastAwakenTime time.Time
}

// NewStoreInfo creates StoreInfo with meta data.
Expand Down Expand Up @@ -541,11 +539,6 @@ func (s *StoreInfo) NeedAwakenStore() bool {
return s.GetLastHeartbeatTS().Sub(s.lastAwakenTime) > awakenStoreInterval
}

// HasRecentlySplitRegions checks if there are some region are splitted in this store.
func (s *StoreInfo) HasRecentlySplitRegions() bool {
return time.Since(s.recentlySplitRegionsTime) < splitStoreWait
}

var (
// If a store's last heartbeat is storeDisconnectDuration ago, the store will
// be marked as disconnected state. The value should be greater than tikv's
Expand Down
7 changes: 0 additions & 7 deletions pkg/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,3 @@ func SetLastAwakenTime(lastAwaken time.Time) StoreCreateOption {
store.lastAwakenTime = lastAwaken
}
}

// SetRecentlySplitRegionsTime sets last split time for the store.
func SetRecentlySplitRegionsTime(recentlySplitRegionsTime time.Time) StoreCreateOption {
return func(store *StoreInfo) {
store.recentlySplitRegionsTime = recentlySplitRegionsTime
}
}
2 changes: 1 addition & 1 deletion pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
if consumption == nil {
continue
}
ruLabelType := tidbTypeLabel
ruLabelType := defaultTypeLabel
if consumptionInfo.isBackground {
ruLabelType = backgroundTypeLabel
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/resourcemanager/server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ const (
readTypeLabel = "read"
writeTypeLabel = "write"
backgroundTypeLabel = "background"
tiflashTypeLabel = "tiflash"
tidbTypeLabel = "tidb"
tiflashTypeLabel = "ap"
defaultTypeLabel = "tp"
)

var (
Expand Down
8 changes: 4 additions & 4 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,8 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
changed := core.GenerateRegionGuideFunc(true)(region, origin)
if !changed.SaveCache && !changed.IsNew {
isNew, _, saveCache, _ := core.GenerateRegionGuideFunc(true)(region, origin)
if !saveCache && !isNew {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
Expand All @@ -444,7 +444,7 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
}

var overlaps []*core.RegionInfo
if changed.SaveCache {
if saveCache {
// To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one,
// check its validation again here.
//
Expand All @@ -456,7 +456,7 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
cluster.HandleOverlaps(c, overlaps)
}

cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, changed.IsNew, c.IsPrepared())
cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared())
return nil
}

Expand Down
2 changes: 0 additions & 2 deletions pkg/schedule/filter/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ const (
storeStateTooManyPendingPeer
storeStateRejectLeader
storeStateSlowTrend
storeStateRecentlySplitRegions

filtersLen
)
Expand Down Expand Up @@ -157,7 +156,6 @@ var filters = [filtersLen]string{
"store-state-too-many-pending-peers-filter",
"store-state-reject-leader-filter",
"store-state-slow-trend-filter",
"store-state-recently-split-regions-filter",
}

// String implements fmt.Stringer interface.
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/filter/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestString(t *testing.T) {
expected string
}{
{int(storeStateTombstone), "store-state-tombstone-filter"},
{int(filtersLen - 1), "store-state-recently-split-regions-filter"},
{int(filtersLen - 1), "store-state-slow-trend-filter"},
{int(filtersLen), "unknown"},
}

Expand Down
13 changes: 1 addition & 12 deletions pkg/schedule/filter/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,6 @@ type StoreStateFilter struct {
// If it checks failed, the operator will be put back to the waiting queue util the limit is available.
// But the scheduler should keep the same with the operator level.
OperatorLevel constant.PriorityLevel
// check the store not split recently in it if set true.
ForbidRecentlySplitRegions bool
// Reason is used to distinguish the reason of store state filter
Reason filterType
}
Expand Down Expand Up @@ -473,15 +471,6 @@ func (f *StoreStateFilter) hasRejectLeaderProperty(conf config.SharedConfigProvi
return statusOK
}

func (f *StoreStateFilter) hasRecentlySplitRegions(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status {
if f.ForbidRecentlySplitRegions && store.HasRecentlySplitRegions() {
f.Reason = storeStateRecentlySplitRegions
return statusStoreRecentlySplitRegions
}
f.Reason = storeStateOK
return statusOK
}

// The condition table.
// Y: the condition is temporary (expected to become false soon).
// N: the condition is expected to be true for a long time.
Expand Down Expand Up @@ -510,7 +499,7 @@ func (f *StoreStateFilter) anyConditionMatch(typ int, conf config.SharedConfigPr
var funcs []conditionFunc
switch typ {
case leaderSource:
funcs = []conditionFunc{f.isRemoved, f.isDown, f.pauseLeaderTransfer, f.isDisconnected, f.hasRecentlySplitRegions}
funcs = []conditionFunc{f.isRemoved, f.isDown, f.pauseLeaderTransfer, f.isDisconnected}
case regionSource:
funcs = []conditionFunc{f.isBusy, f.exceedRemoveLimit, f.tooManySnapshots}
case witnessSource:
Expand Down
45 changes: 19 additions & 26 deletions pkg/schedule/filter/region_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,24 @@ import (
"github.com/tikv/pd/pkg/slice"
)

// SelectRegions selects regions that be selected from the list.
func SelectRegions(regions []*core.RegionInfo, filters ...RegionFilter) []*core.RegionInfo {
return filterRegionsBy(regions, func(r *core.RegionInfo) bool {
return slice.AllOf(filters, func(i int) bool {
return filters[i].Select(r).IsOK()
})
})
}

func filterRegionsBy(regions []*core.RegionInfo, keepPred func(*core.RegionInfo) bool) (selected []*core.RegionInfo) {
for _, s := range regions {
if keepPred(s) {
selected = append(selected, s)
}
}
return
}

// SelectOneRegion selects one region that be selected from the list.
func SelectOneRegion(regions []*core.RegionInfo, collector *plan.Collector, filters ...RegionFilter) *core.RegionInfo {
for _, r := range regions {
Expand Down Expand Up @@ -155,7 +173,7 @@ type SnapshotSenderFilter struct {
senders map[uint64]struct{}
}

// NewSnapshotSendFilter returns creates a RegionFilter that filters regions whose leader has sender limit on the specific store.
// NewSnapshotSendFilter returns creates a RegionFilter that filters regions with witness peer on the specific store.
// level should be set as same with the operator priority level.
func NewSnapshotSendFilter(stores []*core.StoreInfo, level constant.PriorityLevel) RegionFilter {
senders := make(map[uint64]struct{})
Expand All @@ -175,28 +193,3 @@ func (f *SnapshotSenderFilter) Select(region *core.RegionInfo) *plan.Status {
}
return statusRegionLeaderSendSnapshotThrottled
}

// StoreRecentlySplitFilter filer the region whose leader store not recently split regions.
type StoreRecentlySplitFilter struct {
recentlySplitStores map[uint64]struct{}
}

// NewStoreRecentlySplitFilter returns creates a StoreRecentlySplitFilter.
func NewStoreRecentlySplitFilter(stores []*core.StoreInfo) RegionFilter {
recentlySplitStores := make(map[uint64]struct{})
for _, store := range stores {
if store.HasRecentlySplitRegions() {
recentlySplitStores[store.GetID()] = struct{}{}
}
}
return &StoreRecentlySplitFilter{recentlySplitStores: recentlySplitStores}
}

// Select returns ok if the region leader not in the recentlySplitStores.
func (f *StoreRecentlySplitFilter) Select(region *core.RegionInfo) *plan.Status {
leaderStoreID := region.GetLeader().GetStoreId()
if _, ok := f.recentlySplitStores[leaderStoreID]; ok {
return statusStoreRecentlySplitRegions
}
return statusOK
}
5 changes: 2 additions & 3 deletions pkg/schedule/filter/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ var (
// store config limitation
statusStoreRejectLeader = plan.NewStatus(plan.StatusStoreRejectLeader)

statusStoreNotMatchRule = plan.NewStatus(plan.StatusStoreNotMatchRule)
statusStoreNotMatchIsolation = plan.NewStatus(plan.StatusStoreNotMatchIsolation)
statusStoreRecentlySplitRegions = plan.NewStatus(plan.StatusStoreRecentlySplitRegions)
statusStoreNotMatchRule = plan.NewStatus(plan.StatusStoreNotMatchRule)
statusStoreNotMatchIsolation = plan.NewStatus(plan.StatusStoreNotMatchIsolation)

// region filter status
statusRegionPendingPeer = plan.NewStatus(plan.StatusRegionUnhealthy)
Expand Down
Loading

0 comments on commit f9600a8

Please sign in to comment.