Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#4942
Browse files Browse the repository at this point in the history
close tikv#4941

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
rleungx authored and ti-chi-bot committed May 16, 2022
1 parent b99630d commit e063d36
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 0 deletions.
1 change: 1 addition & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) {
return nil, nil
}

c.core.ResetStores()
start := time.Now()
if err := c.storage.LoadStores(c.core.PutStore); err != nil {
return nil, err
Expand Down
7 changes: 7 additions & 0 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,13 @@ func (bc *BasicCluster) PutStore(store *StoreInfo) {
bc.Stores.SetStore(store)
}

// ResetStores resets the store cache.
func (bc *BasicCluster) ResetStores() {
bc.Lock()
defer bc.Unlock()
bc.Stores = NewStoresInfo()
}

// DeleteStore deletes a store.
func (bc *BasicCluster) DeleteStore(store *StoreInfo) {
bc.Lock()
Expand Down
194 changes: 194 additions & 0 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1113,3 +1113,197 @@ func (s *clusterTestSuite) TestStaleTermHeartbeat(c *C) {
err = rc.HandleRegionHeartbeat(region)
c.Assert(err, IsNil)
}
<<<<<<< HEAD
=======

func (s *clusterTestSuite) putRegionWithLeader(c *C, rc *cluster.RaftCluster, id id.Allocator, storeID uint64) {
for i := 0; i < 3; i++ {
regionID, err := id.Alloc()
c.Assert(err, IsNil)
peerID, err := id.Alloc()
c.Assert(err, IsNil)
region := &metapb.Region{
Id: regionID,
Peers: []*metapb.Peer{{Id: peerID, StoreId: storeID}},
StartKey: []byte{byte(i)},
EndKey: []byte{byte(i + 1)},
}
rc.HandleRegionHeartbeat(core.NewRegionInfo(region, region.Peers[0]))
}
c.Assert(rc.GetStore(storeID).GetLeaderCount(), Equals, 3)
}

func (s *clusterTestSuite) checkMinResolvedTSFromStorage(c *C, rc *cluster.RaftCluster, expect uint64) {
time.Sleep(time.Millisecond * 10)
ts2, err := rc.GetStorage().LoadMinResolvedTS()
c.Assert(err, IsNil)
c.Assert(ts2, Equals, expect)
}

func (s *clusterTestSuite) setMinResolvedTSPersistenceInterval(c *C, rc *cluster.RaftCluster, svr *server.Server, interval time.Duration) {
cfg := rc.GetOpts().GetPDServerConfig().Clone()
cfg.MinResolvedTSPersistenceInterval = typeutil.NewDuration(interval)
err := svr.SetPDServerConfig(*cfg)
c.Assert(err, IsNil)
time.Sleep(time.Millisecond + interval)
}

func (s *clusterTestSuite) TestMinResolvedTS(c *C) {
cluster.DefaultMinResolvedTSPersistenceInterval = time.Millisecond
tc, err := tests.NewTestCluster(s.ctx, 1)
defer tc.Destroy()
c.Assert(err, IsNil)
err = tc.RunInitialServers()
c.Assert(err, IsNil)
tc.WaitLeader()
leaderServer := tc.GetServer(tc.GetLeader())
id := leaderServer.GetAllocator()
grpcPDClient := testutil.MustNewGrpcClient(c, leaderServer.GetAddr())
clusterID := leaderServer.GetClusterID()
bootstrapCluster(c, clusterID, grpcPDClient)
rc := leaderServer.GetRaftCluster()
c.Assert(rc, NotNil)
svr := leaderServer.GetServer()
addStoreAndCheckMinResolvedTS := func(c *C, isTiflash bool, minResolvedTS, expect uint64) uint64 {
storeID, err := id.Alloc()
c.Assert(err, IsNil)
store := &metapb.Store{
Id: storeID,
Version: "v6.0.0",
Address: "127.0.0.1:" + strconv.Itoa(int(storeID)),
}
if isTiflash {
store.Labels = []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}
}
_, err = putStore(grpcPDClient, clusterID, store)
c.Assert(err, IsNil)
req := &pdpb.ReportMinResolvedTsRequest{
Header: testutil.NewRequestHeader(clusterID),
StoreId: storeID,
MinResolvedTs: minResolvedTS,
}
_, err = grpcPDClient.ReportMinResolvedTS(context.Background(), req)
c.Assert(err, IsNil)
ts := rc.GetMinResolvedTS()
c.Assert(ts, Equals, expect)
return storeID
}

// case1: cluster is no initialized
// min resolved ts should be not available
status, err := rc.LoadClusterStatus()
c.Assert(err, IsNil)
c.Assert(status.IsInitialized, IsFalse)
store1TS := uint64(233)
store1 := addStoreAndCheckMinResolvedTS(c, false /* not tiflash */, store1TS, math.MaxUint64)

// case2: add leader peer to store1 but no run job
// min resolved ts should be zero
s.putRegionWithLeader(c, rc, id, store1)
ts := rc.GetMinResolvedTS()
c.Assert(ts, Equals, uint64(0))

// case3: add leader peer to store1 and run job
// min resolved ts should be store1TS
s.setMinResolvedTSPersistenceInterval(c, rc, svr, time.Millisecond)
ts = rc.GetMinResolvedTS()
c.Assert(ts, Equals, store1TS)
s.checkMinResolvedTSFromStorage(c, rc, ts)

// case4: add tiflash store
// min resolved ts should no change
addStoreAndCheckMinResolvedTS(c, true /* is tiflash */, 0, store1TS)

// case5: add new store with lager min resolved ts
// min resolved ts should no change
store3TS := store1TS + 10
store3 := addStoreAndCheckMinResolvedTS(c, false /* not tiflash */, store3TS, store1TS)
s.putRegionWithLeader(c, rc, id, store3)

// case6: set store1 to tombstone
// min resolved ts should change to store 3
resetStoreState(c, rc, store1, metapb.StoreState_Tombstone)
ts = rc.GetMinResolvedTS()
c.Assert(ts, Equals, store3TS)

// case7: add a store with leader peer but no report min resolved ts
// min resolved ts should be no change
s.checkMinResolvedTSFromStorage(c, rc, store3TS)
store4 := addStoreAndCheckMinResolvedTS(c, false /* not tiflash */, 0, store3TS)
s.putRegionWithLeader(c, rc, id, store4)
ts = rc.GetMinResolvedTS()
c.Assert(ts, Equals, store3TS)
s.checkMinResolvedTSFromStorage(c, rc, store3TS)
resetStoreState(c, rc, store4, metapb.StoreState_Tombstone)

// case8: set min resolved ts persist interval to zero
// although min resolved ts increase, it should be not persisted until job running.
store5TS := store3TS + 10
s.setMinResolvedTSPersistenceInterval(c, rc, svr, 0)
store5 := addStoreAndCheckMinResolvedTS(c, false /* not tiflash */, store5TS, store3TS)
resetStoreState(c, rc, store3, metapb.StoreState_Tombstone)
s.putRegionWithLeader(c, rc, id, store5)
ts = rc.GetMinResolvedTS()
c.Assert(ts, Equals, store3TS)
s.setMinResolvedTSPersistenceInterval(c, rc, svr, time.Millisecond)
ts = rc.GetMinResolvedTS()
c.Assert(ts, Equals, store5TS)
}

// See https://github.com/tikv/pd/issues/4941
func (s *clusterTestSuite) TestTransferLeaderBack(c *C) {
tc, err := tests.NewTestCluster(s.ctx, 2)
defer tc.Destroy()
c.Assert(err, IsNil)
err = tc.RunInitialServers()
c.Assert(err, IsNil)
tc.WaitLeader()
leaderServer := tc.GetServer(tc.GetLeader())
svr := leaderServer.GetServer()
rc := cluster.NewRaftCluster(s.ctx, svr.ClusterID(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient())
rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetStorage(), svr.GetBasicCluster())
storage := rc.GetStorage()
meta := &metapb.Cluster{Id: 123}
c.Assert(storage.SaveMeta(meta), IsNil)
n := 4
stores := make([]*metapb.Store, 0, n)
for i := 1; i <= n; i++ {
store := &metapb.Store{Id: uint64(i), State: metapb.StoreState_Up}
stores = append(stores, store)
}

for _, store := range stores {
c.Assert(storage.SaveStore(store), IsNil)
}
rc, err = rc.LoadClusterInfo()
c.Assert(err, IsNil)
c.Assert(rc, NotNil)
// offline a store
c.Assert(rc.RemoveStore(1, false), IsNil)
c.Assert(rc.GetStore(1).GetState(), Equals, metapb.StoreState_Offline)

// transfer PD leader to another PD
tc.ResignLeader()
tc.WaitLeader()
leaderServer = tc.GetServer(tc.GetLeader())
svr1 := leaderServer.GetServer()
rc1 := svr1.GetRaftCluster()
c.Assert(err, IsNil)
c.Assert(rc1, NotNil)
// tombstone a store, and remove its record
c.Assert(rc1.BuryStore(1, false), IsNil)
c.Assert(rc1.RemoveTombStoneRecords(), IsNil)

// transfer PD leader back to the previous PD
tc.ResignLeader()
tc.WaitLeader()
leaderServer = tc.GetServer(tc.GetLeader())
svr = leaderServer.GetServer()
rc = svr.GetRaftCluster()
c.Assert(rc, NotNil)

// check store count
c.Assert(rc.GetMetaCluster(), DeepEquals, meta)
c.Assert(rc.GetStoreCount(), Equals, 3)
}
>>>>>>> d12705428 (cluster: reset stores cache before loading cluster info (#4942))

0 comments on commit e063d36

Please sign in to comment.