Skip to content
Open
60 changes: 17 additions & 43 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *router.Region) (*R
leaderAccessIdx = AccessIndex(len(rs.accessIndex[tiKVOnly]))
}
availablePeers = append(availablePeers, p)
switch store.storeType {
switch store.StoreType() {
case tikvrpc.TiKV:
rs.accessIndex[tiKVOnly] = append(rs.accessIndex[tiKVOnly], len(rs.stores))
case tikvrpc.TiFlash:
Expand Down Expand Up @@ -732,7 +732,7 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache {
c.bg.scheduleWithTrigger(func(ctx context.Context, t time.Time) bool {
// check and resolve normal stores periodically by default.
filter := func(state resolveState) bool {
return state != unresolved && state != tombstone && state != deleted
return state != unresolved && state != tombstone
}
if t.IsZero() {
// check and resolve needCheck stores because it's triggered by a CheckStoreEvent this time.
Expand Down Expand Up @@ -799,7 +799,7 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(*

needCheckStores = c.stores.filter(needCheckStores, needCheck)
for _, store := range needCheckStores {
_, err := store.reResolve(c.stores, c.bg)
_, err := store.reResolve(c.stores)
tikverr.Log(err)
}
return needCheckStores
Expand Down Expand Up @@ -835,12 +835,12 @@ type RPCContext struct {
func (c *RPCContext) String() string {
var runStoreType string
if c.Store != nil {
runStoreType = c.Store.storeType.Name()
runStoreType = c.Store.StoreType().Name()
}
res := fmt.Sprintf("region ID: %d, meta: %s, peer: %s, addr: %s, idx: %d, reqStoreType: %s, runStoreType: %s",
c.Region.GetID(), c.Meta, c.Peer, c.Addr, c.AccessIdx, c.AccessMode, runStoreType)
if c.ProxyStore != nil {
res += fmt.Sprintf(", proxy store id: %d, proxy addr: %s", c.ProxyStore.storeID, c.ProxyStore.addr)
res += fmt.Sprintf(", proxy store id: %d, proxy addr: %s", c.ProxyStore.storeID, c.ProxyStore.GetAddr())
}
return res
}
Expand Down Expand Up @@ -936,7 +936,7 @@ func (c *RegionCache) GetTiKVRPCContext(bo *retry.Backoffer, id RegionVerID, rep
cachedRegion.invalidate(Other)
logutil.Logger(bo.GetCtx()).Info("invalidate current region, because others failed on same store",
zap.Uint64("region", id.GetID()),
zap.String("store", store.addr))
zap.String("store", store.GetAddr()))
return nil, nil
}

Expand Down Expand Up @@ -1005,7 +1005,7 @@ func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Sto
if storeFailEpoch != regionStore.storeEpochs[storeIdx] {
continue
}
if !labelFilter(store.labels) {
if !labelFilter(store.GetLabels()) {
continue
}
allStores = append(allStores, store.storeID)
Expand Down Expand Up @@ -1040,7 +1040,7 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID,
for i := 0; i < regionStore.accessStoreNum(tiFlashOnly); i++ {
accessIdx := AccessIndex((sIdx + i) % regionStore.accessStoreNum(tiFlashOnly))
storeIdx, store := regionStore.accessStore(tiFlashOnly, accessIdx)
if !labelFilter(store.labels) {
if !labelFilter(store.GetLabels()) {
continue
}
addr, err := c.getStoreAddr(bo, cachedRegion, store)
Expand All @@ -1052,7 +1052,7 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID,
return nil, nil
}
if store.getResolveState() == needCheck {
_, err := store.reResolve(c.stores, c.bg)
_, err := store.reResolve(c.stores)
tikverr.Log(err)
}
regionStore.workTiFlashIdx.Store(int32(accessIdx))
Expand All @@ -1062,7 +1062,7 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID,
cachedRegion.invalidate(Other)
logutil.Logger(bo.GetCtx()).Info("invalidate current region, because others failed on same store",
zap.Uint64("region", id.GetID()),
zap.String("store", store.addr))
zap.String("store", store.GetAddr()))
// TiFlash will always try to find out a valid peer, avoiding to retry too many times.
continue
}
Expand Down Expand Up @@ -1788,7 +1788,7 @@ func (c *RegionCache) markRegionNeedBeRefill(s *Store, storeIdx int, rs *regionS
// invalidate regions in store.
epoch := rs.storeEpochs[storeIdx]
if atomic.CompareAndSwapUint32(&s.epoch, epoch, epoch+1) {
logutil.BgLogger().Info("mark store's regions need be refill", zap.String("store", s.addr))
logutil.BgLogger().Info("mark store's regions need be refill", zap.String("store", s.GetAddr()))
incEpochStoreIdx = storeIdx
metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc()
}
Expand Down Expand Up @@ -2199,14 +2199,14 @@ func (c *RegionCache) searchCachedRegionByID(regionID uint64) (*Region, bool) {
// GetStoresByType gets stores by type `typ`
func (c *RegionCache) GetStoresByType(typ tikvrpc.EndpointType) []*Store {
return c.stores.filter(nil, func(s *Store) bool {
return s.getResolveState() == resolved && s.storeType == typ
return s.getResolveState() == resolved && s.StoreType() == typ
})
}

// GetAllStores gets TiKV and TiFlash stores.
func (c *RegionCache) GetAllStores() []*Store {
return c.stores.filter(nil, func(s *Store) bool {
return s.getResolveState() == resolved && (s.storeType == tikvrpc.TiKV || s.storeType == tikvrpc.TiFlash)
return s.getResolveState() == resolved && (s.StoreType() == tikvrpc.TiKV || s.StoreType() == tikvrpc.TiFlash)
})
}

Expand Down Expand Up @@ -2703,14 +2703,11 @@ func (c *RegionCache) getStoreAddr(bo *retry.Backoffer, region *Region, store *S
state := store.getResolveState()
switch state {
case resolved, needCheck:
addr = store.addr
addr = store.GetAddr()
return
case unresolved:
addr, err = store.initResolve(bo, c.stores)
return
case deleted:
addr = c.changeToActiveStore(region, store.storeID)
return
case tombstone:
return "", nil
default:
Expand All @@ -2719,7 +2716,7 @@ func (c *RegionCache) getStoreAddr(bo *retry.Backoffer, region *Region, store *S
}

func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *regionStore, workStoreIdx AccessIndex) (proxyStore *Store, proxyAccessIdx AccessIndex, proxyStoreIdx int) {
if !c.enableForwarding || store.storeType != tikvrpc.TiKV || store.getLivenessState() == reachable {
if !c.enableForwarding || store.StoreType() != tikvrpc.TiKV || store.getLivenessState() == reachable {
return
}

Expand Down Expand Up @@ -2760,29 +2757,6 @@ func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *regionStor
return nil, 0, 0
}

// changeToActiveStore replace the deleted store in the region by an up-to-date store in the stores map.
// The order is guaranteed by reResolve() which adds the new store before marking old store deleted.
func (c *RegionCache) changeToActiveStore(region *Region, storeID uint64) (addr string) {
store, _ := c.stores.get(storeID)
for {
oldRegionStore := region.getStore()
newRegionStore := oldRegionStore.clone()
newRegionStore.stores = make([]*Store, 0, len(oldRegionStore.stores))
for _, s := range oldRegionStore.stores {
if s.storeID == store.storeID {
newRegionStore.stores = append(newRegionStore.stores, store)
} else {
newRegionStore.stores = append(newRegionStore.stores, s)
}
}
if region.compareAndSwapStore(oldRegionStore, newRegionStore) {
break
}
}
addr = store.addr
return
}

// OnBucketVersionNotMatch removes the old buckets meta if the version is stale.
func (c *RegionCache) OnBucketVersionNotMatch(ctx *RPCContext, version uint64, keys [][]byte) {
r := c.GetCachedRegionWithRLock(ctx.Region)
Expand Down Expand Up @@ -2841,7 +2815,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *retry.Backoffer, ctx *RPCContext
return false, err
}
var initLeaderStoreID uint64
if ctx.Store.storeType == tikvrpc.TiFlash {
if ctx.Store.StoreType() == tikvrpc.TiFlash {
initLeaderStoreID = region.findElectableStoreID()
} else {
initLeaderStoreID = ctx.Store.storeID
Expand Down Expand Up @@ -2874,7 +2848,7 @@ func (c *RegionCache) PDClient() pd.Client {
// stores so that users won't be bothered by tombstones. (related issue: https://github.com/pingcap/tidb/issues/46602)
func (c *RegionCache) GetTiFlashStores(labelFilter LabelFilter) []*Store {
return c.stores.filter(nil, func(s *Store) bool {
return s.storeType == tikvrpc.TiFlash && labelFilter(s.labels) && s.getResolveState() == resolved
return s.StoreType() == tikvrpc.TiFlash && labelFilter(s.GetLabels()) && s.getResolveState() == resolved
})
}

Expand Down
128 changes: 109 additions & 19 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"fmt"
"math/rand"
"reflect"
"slices"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -364,7 +365,7 @@ func (s *testRegionCacheSuite) TestStoreLabels() {
}
stores := s.cache.stores.filter(nil, func(s *Store) bool { return s.IsLabelsMatch(labels) })
s.Equal(len(stores), 1)
s.Equal(stores[0].labels, labels)
s.Equal(stores[0].GetLabels(), labels)
}
}

Expand Down Expand Up @@ -442,14 +443,13 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() {
store = cache.stores.getOrInsertDefault(s.store1)
store.initResolve(bo, cache.stores)
s.Equal(store.getResolveState(), resolved)
s.cluster.UpdateStoreAddr(s.store1, store.addr+"0", &metapb.StoreLabel{Key: "k", Value: "v"})
s.cluster.UpdateStoreAddr(s.store1, store.GetAddr()+"0", &metapb.StoreLabel{Key: "k", Value: "v"})
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment above this block still describes the old "deleted + new store" behavior, but the implementation now updates store metadata in-place. Please update the comment to match the new semantics so the test remains readable and doesn't mislead future debugging.

Copilot uses AI. Check for mistakes.
cache.stores.markStoreNeedCheck(store)
waitResolve(store)
s.Equal(store.getResolveState(), deleted)
newStore := cache.stores.getOrInsertDefault(s.store1)
s.Equal(newStore.getResolveState(), resolved)
s.Equal(newStore.addr, store.addr+"0")
s.Equal(newStore.labels, []*metapb.StoreLabel{{Key: "k", Value: "v"}})
s.Equal(newStore.GetAddr(), store.GetAddr())
s.Equal(newStore.GetLabels(), []*metapb.StoreLabel{{Key: "k", Value: "v"}})
Comment on lines 449 to +452
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assertions here no longer verify that the store address actually changed after UpdateStoreAddr. Since newStore and store are the same pointer with the new in-place update behavior, newStore.GetAddr() == store.GetAddr() is tautological and would pass even if the update didn't happen. Save the old address before the update and assert that the current address equals the expected new value (and optionally assert pointer equality to ensure the store object wasn't replaced).

Copilot uses AI. Check for mistakes.

// Check initResolve()ing a tombstone store. The resolve state should be tombstone.
cache.clear()
Expand Down Expand Up @@ -2301,16 +2301,13 @@ func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() {
startHealthCheckLoop(s.cache.bg, s.cache.stores, store1, livenessState(store1Liveness), time.Second)

// update store meta
s.cluster.UpdateStoreAddr(store1.storeID, store1.addr+"'", store1.labels...)

// assert that the old store should be deleted and it's not reachable
s.Eventually(func() bool {
return store1.getResolveState() == deleted && store1.getLivenessState() != reachable
}, 3*time.Second, time.Second)
s.cluster.UpdateStoreAddr(store1.storeID, store1.GetAddr()+"'", store1.GetLabels()...)

// assert that the new store should be added and it's also not reachable
newStore1, _ := s.cache.stores.get(store1.storeID)
s.Require().NotEqual(reachable, newStore1.getLivenessState())
// assert that the old store in region cache is replaced
s.Require().Equal(store1.GetAddr(), newStore1.GetAddr())

Comment on lines 2306 to 2311
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test still talks about a "new store" being added/replacing the old one, but the new implementation keeps the same Store pointer and updates metadata in-place. The current checks (NotEqual(reachable, newStore1.getLivenessState()) and Equal(store1.GetAddr(), newStore1.GetAddr())) don't validate the address update (and the addr equality is trivially true if the pointer is unchanged). Consider asserting that the address eventually becomes the updated value (old+"'") and/or that newStore1 == store1 to verify pointer consistency.

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1,
examples like internal/locate/region_cache_test.go:440, internal/locate/region_cache_test.go:2306
they don’t really validate the intended behavior anymore after the refactor

// recover store1
atomic.StoreUint32(&store1Liveness, uint32(reachable))
Expand Down Expand Up @@ -3119,9 +3116,9 @@ func (s *testRegionCacheSuite) TestIssue1401() {
s.Require().NotNil(store1)
s.Require().Equal(resolved, store1.getResolveState())
// change store1 label.
labels := store1.labels
labels := slices.Clone(store1.GetLabels())
labels = append(labels, &metapb.StoreLabel{Key: "host", Value: "0.0.0.0:20161"})
s.cluster.UpdateStoreAddr(store1.storeID, store1.addr, labels...)
s.cluster.UpdateStoreAddr(store1.storeID, store1.GetAddr(), labels...)

// mark the store is unreachable and need check.
atomic.StoreUint32(&store1.livenessState, uint32(unreachable))
Expand All @@ -3140,16 +3137,109 @@ func (s *testRegionCacheSuite) TestIssue1401() {
return s.getResolveState() == needCheck
})

// assert that the old store should be deleted.
s.Eventually(func() bool {
return store1.getResolveState() == deleted
}, 3*time.Second, time.Second)
// assert the new store should be added and it should be resolved and reachable.
newStore1, _ := s.cache.stores.get(s.store1)
// the store pointer should be updated
s.Equal(newStore1.GetAddr(), store1.GetAddr())
s.Eventually(func() bool {
return newStore1.getResolveState() == resolved && newStore1.getLivenessState() == reachable
return newStore1.getResolveState() == resolved && store1.getLivenessState() == reachable
}, 3*time.Second, time.Second)
Comment on lines 3140 to 3146
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is meant to cover pointer consistency between caches (issue #1401/#1823), but it currently only compares addresses. If the goal is to ensure the store object is updated in-place (no replacement), add an assertion that newStore1 and store1 are the same pointer, and use newStore1.getLivenessState() in the Eventually condition to ensure you're checking the object actually used by the cache map.

Copilot uses AI. Check for mistakes.
s.Require().True(isStoreContainLabel(newStore1.labels, "host", "0.0.0.0:20161"))
s.Require().True(isStoreContainLabel(newStore1.GetLabels(), "host", "0.0.0.0:20161"))
}

// TestStoreRestartWithNewLabels verifies that the store in replica selector still has
// reference to the old store when the store is restarted with new labels.
func (s *testRegionCacheSuite) TestStoreRestartWithNewLabels() {
// Initialize region cache
loc, err := s.cache.LocateKey(s.bo, []byte("a"))
s.Require().NoError(err)
s.Require().NotNil(loc)

// Get the leader store (store1 is typically the leader)
store1, exists := s.cache.stores.get(s.store1)
s.Require().True(exists)
s.Require().NotNil(store1)
s.Require().Equal(resolved, store1.getResolveState())

// Step 1: Store starts as reachable (up)
atomic.StoreUint32(&store1.livenessState, uint32(reachable))
s.Require().Equal(reachable, store1.getLivenessState())

// Step 2: Store becomes unreachable (down)
store1Liveness := uint32(unreachable)
s.cache.stores.setMockRequestLiveness(func(ctx context.Context, st *Store) livenessState {
if st.storeID == store1.storeID {
return livenessState(atomic.LoadUint32(&store1Liveness))
}
return reachable
})
atomic.StoreUint32(&store1.livenessState, uint32(unreachable))
s.Require().Equal(unreachable, store1.getLivenessState())

// Start health check loop to monitor store
startHealthCheckLoop(s.cache.bg, s.cache.stores, store1, unreachable, time.Second)

// Create replica selector BEFORE store replacement to capture old store reference
region := s.cache.GetCachedRegionWithRLock(loc.Region)
s.Require().NotNil(region)
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a")}, kvrpcpb.Context{})
replicaSelector, err := newReplicaSelector(s.cache, loc.Region, req)
s.Require().NoError(err)
s.Require().NotNil(replicaSelector)

// Find the replica with store1 to keep reference to old store
var oldReplica *replica
var leaderPeer *metapb.Peer
for _, replica := range replicaSelector.replicas {
if replica.store.storeID == s.store1 {
oldReplica = replica
// Find the peer for this store
for _, peer := range region.GetMeta().Peers {
if peer.StoreId == s.store1 {
leaderPeer = peer
break
}
}
s.Require().NotNil(leaderPeer, "should find peer for store1")
s.Require().Equal(store1, replica.store, "replica should reference old store before replacement")
s.Require().Equal(unreachable, replica.store.getLivenessState(), "old store should be unreachable")
break
}
}
s.Require().NotNil(oldReplica, "should find replica with store1")

// Step 3: Same store_id changes label and restarts
// Change store1's label (address stays the same)
oldLabels := slices.Clone(store1.labels)
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test starts a background health-check loop and then reads store1.labels directly. With the new metaMu protecting labels (and reResolve updating them concurrently), this direct field access will trigger data races under go test -race. Use store1.GetLabels() (and clone it before appending) instead of accessing store1.labels directly.

Suggested change
oldLabels := slices.Clone(store1.labels)
oldLabels := slices.Clone(store1.GetLabels())

Copilot uses AI. Check for mistakes.
newLabels := append(oldLabels, &metapb.StoreLabel{Key: "zone", Value: "zone1"})
s.cluster.UpdateStoreAddr(store1.storeID, store1.GetAddr(), newLabels...)

// Recover store1 (simulate restart) - set liveness back to reachable for new store
atomic.StoreUint32(&store1Liveness, uint32(reachable))

// Trigger store metadata update by marking old store as needCheck and resolving
// This should trigger store_cache.go:494-497 logic: updating store metadata and resolving the store
store1.setResolveState(needCheck)
s.cache.checkAndResolve(nil, func(st *Store) bool {
return st.getResolveState() == needCheck
})

// reResolve is synchronous, but add a small wait for robustness (max 500ms like TestResolveStateTransition)
s.Eventually(func() bool {
return store1.getResolveState() == resolved
}, 500*time.Millisecond, 50*time.Millisecond, "old store should be resolved")

// The store's labels should be updated
s.Require().True(isStoreContainLabel(store1.labels, "zone", "zone1"), "new store should have updated labels")
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as above: this assertion reads store1.labels directly while the health-check loop can update labels concurrently, which will fail under the repo's go test -race CI. Use store1.GetLabels() here.

Suggested change
s.Require().True(isStoreContainLabel(store1.labels, "zone", "zone1"), "new store should have updated labels")
s.Require().True(isStoreContainLabel(store1.GetLabels(), "zone", "zone1"), "new store should have updated labels")

Copilot uses AI. Check for mistakes.
// Wait for new store to become reachable after restart
s.Eventually(func() bool {
return store1.getLivenessState() == reachable
}, 3*time.Second, 200*time.Millisecond, "new store should become reachable after restart")

// Step 4: Test the replica selector still has reference to OLD store (created before replacement)
s.Require().Equal(store1, oldReplica.store, "replica should still reference old store")
// The old store should be reachable
s.Require().Equal(reachable, oldReplica.store.getLivenessState(), "old store in replica should still be unreachable")
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assertion message contradicts the expected value: it asserts reachable but the message says "should still be unreachable". Please update the message to reflect the intended expectation to avoid confusion when diagnosing failures.

Suggested change
s.Require().Equal(reachable, oldReplica.store.getLivenessState(), "old store in replica should still be unreachable")
s.Require().Equal(reachable, oldReplica.store.getLivenessState(), "old store in replica should still be reachable")

Copilot uses AI. Check for mistakes.
}

func BenchmarkBatchLocateKeyRangesFromCache(t *testing.B) {
Expand Down
Loading
Loading