Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: optimize the processing speed of RegionHeartbeat #3739

Merged
merged 9 commits into from
Jun 5, 2021
69 changes: 39 additions & 30 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,20 +573,25 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
// processRegionHeartbeat updates the region information.
func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
c.RLock()
origin, err := c.core.PreCheckPutRegion(region)
hotStat := c.hotStat
storage := c.storage
coreCluster := c.core
c.RUnlock()

origin, err := coreCluster.PreCheckPutRegion(region)
if err != nil {
c.RUnlock()
return err
}
expiredStats := c.hotStat.ExpiredItems(region)

expiredStats := hotStat.ExpiredItems(region)
// Put expiredStats into read/write queue to update stats
if len(expiredStats) > 0 {
for _, stat := range expiredStats {
item := statistics.NewExpiredStatItem(stat)
if stat.Kind == statistics.WriteFlow {
c.hotStat.CheckWriteAsync(item)
hotStat.CheckWriteAsync(item)
} else {
c.hotStat.CheckReadAsync(item)
hotStat.CheckReadAsync(item)
}
}
}
Expand All @@ -595,9 +600,8 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
for _, peer := range region.GetPeers() {
peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval)
item := statistics.NewPeerInfoItem(peerInfo, region)
c.hotStat.CheckWriteAsync(item)
hotStat.CheckWriteAsync(item)
}
c.RUnlock()

// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
Expand Down Expand Up @@ -680,6 +684,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
time.Sleep(500 * time.Millisecond)
})

var overlaps []*core.RegionInfo
c.Lock()
if saveCache {
// To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one,
Expand All @@ -690,17 +695,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
c.Unlock()
return err
}
overlaps := c.core.PutRegion(region)
if c.storage != nil {
for _, item := range overlaps {
if err := c.storage.DeleteRegion(item.GetMeta()); err != nil {
log.Error("failed to delete region from storage",
zap.Uint64("region-id", item.GetID()),
logutil.ZapRedactStringer("region-meta", core.RegionToHexMeta(item.GetMeta())),
errs.ZapError(err))
}
}
}
overlaps = c.core.PutRegion(region)
for _, item := range overlaps {
if c.regionStats != nil {
c.regionStats.ClearDefunctRegion(item.GetID())
Expand Down Expand Up @@ -731,24 +726,38 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
if c.regionStats != nil {
c.regionStats.Observe(region, c.getRegionStoresLocked(region))
}

changedRegions := c.changedRegions

c.Unlock()

// If there are concurrent heartbeats from the same region, the last write will win even if
// writes to storage in the critical area. So don't use mutex to protect it.
if saveKV && c.storage != nil {
if err := c.storage.SaveRegion(region.GetMeta()); err != nil {
// Not successfully saved to storage is not fatal, it only leads to longer warm-up
// after restart. Here we only log the error then go on updating cache.
log.Error("failed to save region to storage",
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactStringer("region-meta", core.RegionToHexMeta(region.GetMeta())),
errs.ZapError(err))
if storage != nil {
// If there are concurrent heartbeats from the same region, the last write will win even if
// writes to storage in the critical area. So don't use mutex to protect it.
// Not successfully saved to storage is not fatal, it only leads to longer warm-up
// after restart. Here we only log the error then go on updating cache.
for _, item := range overlaps {
if err := storage.DeleteRegion(item.GetMeta()); err != nil {
log.Error("failed to delete region from storage",
zap.Uint64("region-id", item.GetID()),
logutil.ZapRedactStringer("region-meta", core.RegionToHexMeta(item.GetMeta())),
errs.ZapError(err))
}
}
if saveKV {
if err := storage.SaveRegion(region.GetMeta()); err != nil {
log.Error("failed to save region to storage",
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactStringer("region-meta", core.RegionToHexMeta(region.GetMeta())),
errs.ZapError(err))
}
regionEventCounter.WithLabelValues("update_kv").Inc()
}
regionEventCounter.WithLabelValues("update_kv").Inc()
}

if saveKV || needSync {
select {
case c.changedRegions <- region:
case changedRegions <- region:
default:
}
}
Expand Down
6 changes: 3 additions & 3 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ func (s *testRegionsInfoSuite) Test(c *C) {
c.Assert(cache.SearchRegion(regionKey), IsNil)
checkRegions(c, cache, regions[0:i])

cache.AddRegion(region)
cache.SetRegion(region)
checkRegion(c, cache.GetRegion(i), region)
checkRegion(c, cache.SearchRegion(regionKey), region)
checkRegions(c, cache, regions[0:(i+1)])
Expand All @@ -952,7 +952,7 @@ func (s *testRegionsInfoSuite) Test(c *C) {
// Reset leader to peer 0.
newRegion = region.Clone(core.WithLeader(region.GetPeers()[0]))
regions[i] = newRegion
cache.AddRegion(newRegion)
cache.SetRegion(newRegion)
checkRegion(c, cache.GetRegion(i), newRegion)
checkRegions(c, cache, regions[0:(i+1)])
checkRegion(c, cache.SearchRegion(regionKey), newRegion)
Expand All @@ -971,7 +971,7 @@ func (s *testRegionsInfoSuite) Test(c *C) {
// check overlaps
// clone it otherwise there are two items with the same key in the tree
overlapRegion := regions[n-1].Clone(core.WithStartKey(regions[n-2].GetStartKey()))
cache.AddRegion(overlapRegion)
cache.SetRegion(overlapRegion)
c.Assert(cache.GetRegion(n-2), IsNil)
c.Assert(cache.GetRegion(n-1), NotNil)

Expand Down
28 changes: 16 additions & 12 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,30 +282,34 @@ func (bc *BasicCluster) DeleteStore(store *StoreInfo) {
bc.Stores.DeleteStore(store)
}

// PreCheckPutRegion checks if the region is valid to put.
func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, error) {
func (bc *BasicCluster) getRelevantRegions(region *RegionInfo) (origin *RegionInfo, overlaps []*RegionInfo) {
bc.RLock()
origin := bc.Regions.GetRegion(region.GetID())
defer bc.RUnlock()
origin = bc.Regions.GetRegion(region.GetID())
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
for _, item := range bc.Regions.GetOverlaps(region) {
if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() {
bc.RUnlock()
return nil, errRegionIsStale(region.GetMeta(), item.GetMeta())
}
overlaps = bc.Regions.GetOverlaps(region)
}
return
}

// PreCheckPutRegion checks if the region is valid to put.
func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, error) {
origin, overlaps := bc.getRelevantRegions(region)
for _, item := range overlaps {
if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() {
return nil, errRegionIsStale(region.GetMeta(), item.GetMeta())
}
}
bc.RUnlock()
if origin == nil {
return nil, nil
}

r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()

// TiKV reports term after v3.0
isTermBehind := region.GetTerm() > 0 && region.GetTerm() < origin.GetTerm()

// Region meta is stale, return an error.
if r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() || isTermBehind {
if isTermBehind || r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() {
return origin, errRegionIsStale(region.GetMeta(), origin.GetMeta())
}

Expand Down
Loading