-
Notifications
You must be signed in to change notification settings - Fork 253
fix inconsistent store pointers between the region-cache and store-cache #1826
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
0181cfc
752fd7e
2ab0333
64a5ecf
a6d7c30
f7b0846
0ecb1b7
2364583
4c68ce5
c0c4fab
98781a8
b964820
4948a98
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -42,6 +42,7 @@ import ( | |||||
| "fmt" | ||||||
| "math/rand" | ||||||
| "reflect" | ||||||
| "slices" | ||||||
| "sync" | ||||||
| "sync/atomic" | ||||||
| "testing" | ||||||
|
|
@@ -364,7 +365,7 @@ func (s *testRegionCacheSuite) TestStoreLabels() { | |||||
| } | ||||||
| stores := s.cache.stores.filter(nil, func(s *Store) bool { return s.IsLabelsMatch(labels) }) | ||||||
| s.Equal(len(stores), 1) | ||||||
| s.Equal(stores[0].labels, labels) | ||||||
| s.Equal(stores[0].GetLabels(), labels) | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
|
|
@@ -442,14 +443,13 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() { | |||||
| store = cache.stores.getOrInsertDefault(s.store1) | ||||||
| store.initResolve(bo, cache.stores) | ||||||
| s.Equal(store.getResolveState(), resolved) | ||||||
| s.cluster.UpdateStoreAddr(s.store1, store.addr+"0", &metapb.StoreLabel{Key: "k", Value: "v"}) | ||||||
| s.cluster.UpdateStoreAddr(s.store1, store.GetAddr()+"0", &metapb.StoreLabel{Key: "k", Value: "v"}) | ||||||
| cache.stores.markStoreNeedCheck(store) | ||||||
| waitResolve(store) | ||||||
| s.Equal(store.getResolveState(), deleted) | ||||||
| newStore := cache.stores.getOrInsertDefault(s.store1) | ||||||
| s.Equal(newStore.getResolveState(), resolved) | ||||||
| s.Equal(newStore.addr, store.addr+"0") | ||||||
| s.Equal(newStore.labels, []*metapb.StoreLabel{{Key: "k", Value: "v"}}) | ||||||
| s.Equal(newStore.GetAddr(), store.GetAddr()) | ||||||
| s.Equal(newStore.GetLabels(), []*metapb.StoreLabel{{Key: "k", Value: "v"}}) | ||||||
|
Comment on lines
449
to
+452
|
||||||
|
|
||||||
| // Check initResolve()ing a tombstone store. The resolve state should be tombstone. | ||||||
| cache.clear() | ||||||
|
|
@@ -2301,16 +2301,13 @@ func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() { | |||||
| startHealthCheckLoop(s.cache.bg, s.cache.stores, store1, livenessState(store1Liveness), time.Second) | ||||||
|
|
||||||
| // update store meta | ||||||
| s.cluster.UpdateStoreAddr(store1.storeID, store1.addr+"'", store1.labels...) | ||||||
|
|
||||||
| // assert that the old store should be deleted and it's not reachable | ||||||
| s.Eventually(func() bool { | ||||||
| return store1.getResolveState() == deleted && store1.getLivenessState() != reachable | ||||||
| }, 3*time.Second, time.Second) | ||||||
| s.cluster.UpdateStoreAddr(store1.storeID, store1.GetAddr()+"'", store1.GetLabels()...) | ||||||
|
|
||||||
| // assert that the new store should be added and it's also not reachable | ||||||
| newStore1, _ := s.cache.stores.get(store1.storeID) | ||||||
| s.Require().NotEqual(reachable, newStore1.getLivenessState()) | ||||||
| // assert that the old store in region cache is replaced | ||||||
| s.Require().Equal(store1.GetAddr(), newStore1.GetAddr()) | ||||||
|
|
||||||
|
Comment on lines
2306
to
2311
|
||||||
| // recover store1 | ||||||
| atomic.StoreUint32(&store1Liveness, uint32(reachable)) | ||||||
|
|
@@ -3119,9 +3116,9 @@ func (s *testRegionCacheSuite) TestIssue1401() { | |||||
| s.Require().NotNil(store1) | ||||||
| s.Require().Equal(resolved, store1.getResolveState()) | ||||||
| // change store1 label. | ||||||
| labels := store1.labels | ||||||
| labels := slices.Clone(store1.GetLabels()) | ||||||
| labels = append(labels, &metapb.StoreLabel{Key: "host", Value: "0.0.0.0:20161"}) | ||||||
| s.cluster.UpdateStoreAddr(store1.storeID, store1.addr, labels...) | ||||||
| s.cluster.UpdateStoreAddr(store1.storeID, store1.GetAddr(), labels...) | ||||||
|
|
||||||
| // mark the store is unreachable and need check. | ||||||
| atomic.StoreUint32(&store1.livenessState, uint32(unreachable)) | ||||||
|
|
@@ -3140,16 +3137,109 @@ func (s *testRegionCacheSuite) TestIssue1401() { | |||||
| return s.getResolveState() == needCheck | ||||||
| }) | ||||||
|
|
||||||
| // assert that the old store should be deleted. | ||||||
| s.Eventually(func() bool { | ||||||
| return store1.getResolveState() == deleted | ||||||
| }, 3*time.Second, time.Second) | ||||||
| // assert the new store should be added and it should be resolved and reachable. | ||||||
| newStore1, _ := s.cache.stores.get(s.store1) | ||||||
| // the store pointer should be updated | ||||||
| s.Equal(newStore1.GetAddr(), store1.GetAddr()) | ||||||
| s.Eventually(func() bool { | ||||||
| return newStore1.getResolveState() == resolved && newStore1.getLivenessState() == reachable | ||||||
| return newStore1.getResolveState() == resolved && store1.getLivenessState() == reachable | ||||||
| }, 3*time.Second, time.Second) | ||||||
|
Comment on lines
3140
to
3146
|
||||||
| s.Require().True(isStoreContainLabel(newStore1.labels, "host", "0.0.0.0:20161")) | ||||||
| s.Require().True(isStoreContainLabel(newStore1.GetLabels(), "host", "0.0.0.0:20161")) | ||||||
| } | ||||||
|
|
||||||
| // TestStoreRestartWithNewLabels verifies that the store in replica selector still has | ||||||
| // reference to the old store when the store is restarted with new labels. | ||||||
| func (s *testRegionCacheSuite) TestStoreRestartWithNewLabels() { | ||||||
| // Initialize region cache | ||||||
| loc, err := s.cache.LocateKey(s.bo, []byte("a")) | ||||||
| s.Require().NoError(err) | ||||||
| s.Require().NotNil(loc) | ||||||
|
|
||||||
| // Get the leader store (store1 is typically the leader) | ||||||
| store1, exists := s.cache.stores.get(s.store1) | ||||||
| s.Require().True(exists) | ||||||
| s.Require().NotNil(store1) | ||||||
| s.Require().Equal(resolved, store1.getResolveState()) | ||||||
|
|
||||||
| // Step 1: Store starts as reachable (up) | ||||||
| atomic.StoreUint32(&store1.livenessState, uint32(reachable)) | ||||||
| s.Require().Equal(reachable, store1.getLivenessState()) | ||||||
|
|
||||||
| // Step 2: Store becomes unreachable (down) | ||||||
| store1Liveness := uint32(unreachable) | ||||||
| s.cache.stores.setMockRequestLiveness(func(ctx context.Context, st *Store) livenessState { | ||||||
| if st.storeID == store1.storeID { | ||||||
| return livenessState(atomic.LoadUint32(&store1Liveness)) | ||||||
| } | ||||||
| return reachable | ||||||
| }) | ||||||
| atomic.StoreUint32(&store1.livenessState, uint32(unreachable)) | ||||||
| s.Require().Equal(unreachable, store1.getLivenessState()) | ||||||
|
|
||||||
| // Start health check loop to monitor store | ||||||
| startHealthCheckLoop(s.cache.bg, s.cache.stores, store1, unreachable, time.Second) | ||||||
|
|
||||||
| // Create replica selector BEFORE store replacement to capture old store reference | ||||||
| region := s.cache.GetCachedRegionWithRLock(loc.Region) | ||||||
| s.Require().NotNil(region) | ||||||
| req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a")}, kvrpcpb.Context{}) | ||||||
| replicaSelector, err := newReplicaSelector(s.cache, loc.Region, req) | ||||||
| s.Require().NoError(err) | ||||||
| s.Require().NotNil(replicaSelector) | ||||||
|
|
||||||
| // Find the replica with store1 to keep reference to old store | ||||||
| var oldReplica *replica | ||||||
| var leaderPeer *metapb.Peer | ||||||
| for _, replica := range replicaSelector.replicas { | ||||||
| if replica.store.storeID == s.store1 { | ||||||
| oldReplica = replica | ||||||
| // Find the peer for this store | ||||||
| for _, peer := range region.GetMeta().Peers { | ||||||
| if peer.StoreId == s.store1 { | ||||||
| leaderPeer = peer | ||||||
| break | ||||||
| } | ||||||
| } | ||||||
| s.Require().NotNil(leaderPeer, "should find peer for store1") | ||||||
| s.Require().Equal(store1, replica.store, "replica should reference old store before replacement") | ||||||
| s.Require().Equal(unreachable, replica.store.getLivenessState(), "old store should be unreachable") | ||||||
| break | ||||||
| } | ||||||
| } | ||||||
| s.Require().NotNil(oldReplica, "should find replica with store1") | ||||||
|
|
||||||
| // Step 3: Same store_id changes label and restarts | ||||||
| // Change store1's label (address stays the same) | ||||||
| oldLabels := slices.Clone(store1.labels) | ||||||
|
||||||
| oldLabels := slices.Clone(store1.labels) | |
| oldLabels := slices.Clone(store1.GetLabels()) |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same issue as above: this assertion reads store1.labels directly while the health-check loop can update labels concurrently, which will fail under the repo's go test -race CI. Use store1.GetLabels() here.
| s.Require().True(isStoreContainLabel(store1.labels, "zone", "zone1"), "new store should have updated labels") | |
| s.Require().True(isStoreContainLabel(store1.GetLabels(), "zone", "zone1"), "new store should have updated labels") |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assertion message contradicts the expected value: it asserts reachable but the message says "should still be unreachable". Please update the message to reflect the intended expectation to avoid confusion when diagnosing failures.
| s.Require().Equal(reachable, oldReplica.store.getLivenessState(), "old store in replica should still be unreachable") | |
| s.Require().Equal(reachable, oldReplica.store.getLivenessState(), "old store in replica should still be reachable") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment above this block still describes the old "deleted + new store" behavior, but the implementation now updates store metadata in-place. Please update the comment to match the new semantics so the test remains readable and doesn't mislead future debugging.