Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Oct 16, 2024
1 parent 4e0ca4f commit 8fef7af
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 23 deletions.
45 changes: 24 additions & 21 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,29 +402,32 @@ func (c *RaftCluster) checkSchedulingService() {

// checkTSOService checks the TSO service.
func (c *RaftCluster) checkTSOService() {
if !c.isAPIServiceMode {
if c.member.IsLeader() {
if err := c.startTSOJobs(); err != nil {
// If there is an error, need to wait for the next check.
return
if c.isAPIServiceMode {
return
}
if c.member.IsLeader() {
if err := c.startTSOJobs(); err != nil {
// If there is an error, need to wait for the next check.
log.Error("failed to start TSO jobs", errs.ZapError(err))
return
}
} else {
// leader exits, reset the allocator group
if err := c.stopTSOJobs(); err != nil {
// If there is an error, need to wait for the next check.
log.Error("failed to stop TSO jobs", errs.ZapError(err))
return
}

failpoint.Inject("updateAfterResetTSO", func() {
allocator, _ := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation)
if err := allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) {
log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err))
}
} else {
// leader exits, reset the allocator group
if err := c.stopTSOJobs(); err != nil {
// If there is an error, need to wait for the next check.
return
if allocator.IsInitialize() {
log.Panic("the allocator should be uninitialized after reset")
}

failpoint.Inject("updateAfterResetTSO", func() {
allocator, _ := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation)
if err := allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) {
log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err))
}
if allocator.IsInitialize() {
log.Panic("the allocator should be uninitialized after reset")
}
})
}
})
}
}

Expand Down
4 changes: 2 additions & 2 deletions tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,12 +339,12 @@ func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() {
return err == nil
})
// Resign leader to trigger the TSO resetting.
re.NoError(failpoint.Enable("github.com/tikv/pd/server/updateAfterResetTSO", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/updateAfterResetTSO", "return(true)"))
oldLeaderName := suite.cluster.WaitLeader()
re.NotEmpty(oldLeaderName)
err := suite.cluster.GetServer(oldLeaderName).ResignLeader()
re.NoError(err)
re.NoError(failpoint.Disable("github.com/tikv/pd/server/updateAfterResetTSO"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/updateAfterResetTSO"))
newLeaderName := suite.cluster.WaitLeader()
re.NotEmpty(newLeaderName)
re.NotEqual(oldLeaderName, newLeaderName)
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/tso/consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (suite *tsoConsistencyTestSuite) SetupSuite() {
leaderName := suite.cluster.WaitLeader()
re.NotEmpty(leaderName)
suite.pdLeaderServer = suite.cluster.GetServer(leaderName)
suite.pdLeaderServer.BootstrapCluster()
backendEndpoints := suite.pdLeaderServer.GetAddr()
if suite.legacy {
suite.pdClient = tu.MustNewGrpcClient(re, backendEndpoints)
Expand Down
3 changes: 3 additions & 0 deletions tests/server/tso/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ func TestPriorityAndDifferentLocalTSO(t *testing.T) {
re.NoError(cluster.RunInitialServers())

cluster.WaitAllLeaders(re, dcLocationConfig)
leaderServer := cluster.GetLeaderServer()
re.NotNil(leaderServer)
leaderServer.BootstrapCluster()

// Wait for all nodes becoming healthy.
time.Sleep(time.Second * 5)
Expand Down
3 changes: 3 additions & 0 deletions tests/server/tso/global_tso_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func TestDelaySyncTimestamp(t *testing.T) {
var leaderServer, nextLeaderServer *tests.TestServer
leaderServer = cluster.GetLeaderServer()
re.NotNil(leaderServer)
leaderServer.BootstrapCluster()
for _, s := range cluster.GetServers() {
if s.GetConfig().Name != cluster.GetLeader() {
nextLeaderServer = s
Expand Down Expand Up @@ -146,6 +147,8 @@ func TestLogicalOverflow(t *testing.T) {
re.NotEmpty(cluster.WaitLeader())

leaderServer := cluster.GetLeaderServer()
re.NotNil(leaderServer)
leaderServer.BootstrapCluster()
grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr())
clusterID := leaderServer.GetClusterID()

Expand Down
2 changes: 2 additions & 0 deletions tests/server/tso/tso_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ func TestDisableLocalTSOAfterEnabling(t *testing.T) {
re.NoError(cluster.RunInitialServers())

cluster.WaitAllLeaders(re, dcLocationConfig)
leaderServer := cluster.GetLeaderServer()
leaderServer.BootstrapCluster()
requestLocalTSOs(re, cluster, dcLocationConfig)

// Reboot the cluster.
Expand Down

0 comments on commit 8fef7af

Please sign in to comment.