-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
store-tikv: drop invalid cached region #4506
Changes from all commits
f7305aa
bdf60c9
80f17c5
ade7e00
079dbfc
21d8e4e
cfccf98
7f1a6bf
1f73620
58a4226
625dc49
153606b
cf482f0
13a527d
8160183
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,8 @@ package tikv | |
import ( | ||
"bytes" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
|
||
log "github.com/Sirupsen/logrus" | ||
"github.com/juju/errors" | ||
|
@@ -26,12 +28,23 @@ import ( | |
goctx "golang.org/x/net/context" | ||
) | ||
|
||
const ( | ||
rcDefaultRegionCacheTTL = time.Minute * 10 | ||
) | ||
|
||
// CachedRegion encapsulates {Region, TTL} | ||
type CachedRegion struct { | ||
region *Region | ||
lastAccess int64 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we use int64 to enable atomic operation |
||
} | ||
|
||
// RegionCache caches Regions loaded from PD. | ||
type RegionCache struct { | ||
pdClient pd.Client | ||
mu struct { | ||
|
||
mu struct { | ||
sync.RWMutex | ||
regions map[RegionVerID]*Region | ||
regions map[RegionVerID]*CachedRegion | ||
sorted *llrb.LLRB | ||
} | ||
storeMu struct { | ||
|
@@ -45,7 +58,7 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { | |
c := &RegionCache{ | ||
pdClient: pdClient, | ||
} | ||
c.mu.regions = make(map[RegionVerID]*Region) | ||
c.mu.regions = make(map[RegionVerID]*CachedRegion) | ||
c.mu.sorted = llrb.New() | ||
c.storeMu.stores = make(map[uint64]*Store) | ||
return c | ||
|
@@ -66,17 +79,36 @@ func (c *RPCContext) GetStoreID() uint64 { | |
return 0 | ||
} | ||
|
||
func (c *CachedRegion) isValid() bool { | ||
lastAccess := atomic.LoadInt64(&c.lastAccess) | ||
lastAccessTime := time.Unix(lastAccess, 0) | ||
return time.Since(lastAccessTime) < rcDefaultRegionCacheTTL | ||
} | ||
|
||
// GetCachedRegion returns a valid region | ||
func (c *RegionCache) GetCachedRegion(id RegionVerID) *Region { | ||
c.mu.RLock() | ||
cachedregion, ok := c.mu.regions[id] | ||
c.mu.RUnlock() | ||
if !ok { | ||
return nil | ||
} | ||
if cachedregion.isValid() { | ||
atomic.StoreInt64(&cachedregion.lastAccess, time.Now().Unix()) | ||
return cachedregion.region | ||
} | ||
c.DropRegion(id) | ||
return nil | ||
} | ||
|
||
// GetRPCContext returns RPCContext for a region. If it returns nil, the region | ||
// must be out of date and already dropped from cache. | ||
func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext, error) { | ||
c.mu.RLock() | ||
region, ok := c.mu.regions[id] | ||
if !ok { | ||
c.mu.RUnlock() | ||
region := c.GetCachedRegion(id) | ||
if region == nil { | ||
return nil, nil | ||
} | ||
kvCtx := region.GetContext() | ||
c.mu.RUnlock() | ||
|
||
addr, err := c.GetStoreAddr(bo, kvCtx.GetPeer().GetStoreId()) | ||
if err != nil { | ||
|
@@ -109,26 +141,24 @@ func (l *KeyLocation) Contains(key []byte) bool { | |
|
||
// LocateKey searches for the region and range that the key is located. | ||
func (c *RegionCache) LocateKey(bo *Backoffer, key []byte) (*KeyLocation, error) { | ||
c.mu.RLock() | ||
if r := c.getRegionFromCache(key); r != nil { | ||
r := c.getRegionFromCache(key) | ||
if r != nil { | ||
loc := &KeyLocation{ | ||
Region: r.VerID(), | ||
StartKey: r.StartKey(), | ||
EndKey: r.EndKey(), | ||
} | ||
c.mu.RUnlock() | ||
return loc, nil | ||
} | ||
c.mu.RUnlock() | ||
|
||
r, err := c.loadRegion(bo, key) | ||
if err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
|
||
c.mu.Lock() | ||
defer c.mu.Unlock() | ||
r = c.insertRegionToCache(r) | ||
c.insertRegionToCache(r) | ||
c.mu.Unlock() | ||
return &KeyLocation{ | ||
Region: r.VerID(), | ||
StartKey: r.StartKey(), | ||
|
@@ -138,26 +168,24 @@ func (c *RegionCache) LocateKey(bo *Backoffer, key []byte) (*KeyLocation, error) | |
|
||
// LocateRegionByID searches for the region with ID | ||
func (c *RegionCache) LocateRegionByID(bo *Backoffer, regionID uint64) (*KeyLocation, error) { | ||
c.mu.RLock() | ||
if r := c.getRegionByIDFromCache(regionID); r != nil { | ||
r := c.getRegionByIDFromCache(regionID) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @disksing Should we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They have different parameters. BTW, I think we need lock here. @atmzhou There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @disksing I remember that you said this function is only used for test. So I did not add lock in getRegionByIDFromCache. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. We don't want debug API have data race too, especially it may affect online service. |
||
if r != nil { | ||
loc := &KeyLocation{ | ||
Region: r.VerID(), | ||
StartKey: r.StartKey(), | ||
EndKey: r.EndKey(), | ||
} | ||
c.mu.RUnlock() | ||
return loc, nil | ||
} | ||
c.mu.RUnlock() | ||
|
||
r, err := c.loadRegionByID(bo, regionID) | ||
if err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
|
||
c.mu.Lock() | ||
defer c.mu.Unlock() | ||
r = c.insertRegionToCache(r) | ||
c.insertRegionToCache(r) | ||
c.mu.Unlock() | ||
return &KeyLocation{ | ||
Region: r.VerID(), | ||
StartKey: r.StartKey(), | ||
|
@@ -209,58 +237,57 @@ func (c *RegionCache) ListRegionIDsInKeyRange(bo *Backoffer, startKey, endKey [] | |
func (c *RegionCache) DropRegion(id RegionVerID) { | ||
c.mu.Lock() | ||
defer c.mu.Unlock() | ||
|
||
c.dropRegionFromCache(id) | ||
} | ||
|
||
// UpdateLeader update some region cache with newer leader info. | ||
func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64) { | ||
c.mu.Lock() | ||
defer c.mu.Unlock() | ||
|
||
r, ok := c.mu.regions[regionID] | ||
if !ok { | ||
r := c.GetCachedRegion(regionID) | ||
if r == nil { | ||
log.Debugf("regionCache: cannot find region when updating leader %d,%d", regionID, leaderStoreID) | ||
return | ||
} | ||
|
||
if !r.SwitchPeer(leaderStoreID) { | ||
log.Debugf("regionCache: cannot find peer when updating leader %d,%d", regionID, leaderStoreID) | ||
c.dropRegionFromCache(r.VerID()) | ||
c.DropRegion(r.VerID()) | ||
} | ||
} | ||
|
||
func (c *RegionCache) getRegionFromCache(key []byte) *Region { | ||
c.mu.RLock() | ||
var r *Region | ||
c.mu.sorted.DescendLessOrEqual(newRBSearchItem(key), func(item llrb.Item) bool { | ||
r = item.(*llrbItem).region | ||
return false | ||
}) | ||
c.mu.RUnlock() | ||
if r != nil && r.Contains(key) { | ||
return r | ||
return c.GetCachedRegion(r.VerID()) | ||
} | ||
return nil | ||
} | ||
|
||
// insertRegionToCache tries to insert the Region to cache. If there is an old | ||
// Region with the same VerID, it will return the old one instead. | ||
// insertRegionToCache tries to insert the Region to cache. | ||
func (c *RegionCache) insertRegionToCache(r *Region) *Region { | ||
if old, ok := c.mu.regions[r.VerID()]; ok { | ||
return old | ||
} | ||
old := c.mu.sorted.ReplaceOrInsert(newRBItem(r)) | ||
if old != nil { | ||
delete(c.mu.regions, old.(*llrbItem).region.VerID()) | ||
} | ||
c.mu.regions[r.VerID()] = r | ||
c.mu.regions[r.VerID()] = &CachedRegion{ | ||
region: r, | ||
lastAccess: time.Now().Unix(), | ||
} | ||
return r | ||
} | ||
|
||
// getRegionByIDFromCache tries to get region by regionID from cache | ||
func (c *RegionCache) getRegionByIDFromCache(regionID uint64) *Region { | ||
c.mu.RLock() | ||
defer c.mu.RUnlock() | ||
for v, r := range c.mu.regions { | ||
if v.id == regionID { | ||
return r | ||
return r.region | ||
} | ||
} | ||
return nil | ||
|
@@ -271,8 +298,8 @@ func (c *RegionCache) dropRegionFromCache(verID RegionVerID) { | |
if !ok { | ||
return | ||
} | ||
c.mu.sorted.Delete(newRBItem(r)) | ||
delete(c.mu.regions, r.VerID()) | ||
c.mu.sorted.Delete(newRBItem(r.region)) | ||
delete(c.mu.regions, verID) | ||
} | ||
|
||
// loadRegion loads region from pd client, and picks the first peer as leader. | ||
|
@@ -403,13 +430,13 @@ func (c *RegionCache) OnRequestFail(ctx *RPCContext, err error) { | |
// Switch region's leader peer to next one. | ||
regionID := ctx.Region | ||
c.mu.Lock() | ||
if region, ok := c.mu.regions[regionID]; ok { | ||
if cachedregion, ok := c.mu.regions[regionID]; ok { | ||
region := cachedregion.region | ||
if !region.OnRequestFail(ctx.KVCtx.GetPeer().GetStoreId()) { | ||
c.dropRegionFromCache(regionID) | ||
} | ||
} | ||
c.mu.Unlock() | ||
|
||
// Store's meta may be out of date. | ||
storeID := ctx.KVCtx.GetPeer().GetStoreId() | ||
c.storeMu.Lock() | ||
|
@@ -420,7 +447,7 @@ func (c *RegionCache) OnRequestFail(ctx *RPCContext, err error) { | |
|
||
c.mu.Lock() | ||
for id, r := range c.mu.regions { | ||
if r.peer.GetStoreId() == storeID { | ||
if r.region.peer.GetStoreId() == storeID { | ||
c.dropRegionFromCache(id) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about
region Region
?