Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test case to simulate EKS upgrading (restart the entire API cluster then TSO cluster) #6534

Merged
merged 1 commit into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Add TestUpgradingAPIandTSOClusters
Add TestUpgradingAPIandTSOClusters to test the scenario that after we restart the API cluster
then restart the TSO cluster, the TSO service can still serve TSO requests normally.

Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed May 29, 2023
commit 3a74c2a85fd8394edeceaf7c1527fb45fc147045
68 changes: 65 additions & 3 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,8 @@ func createTestCluster(ctx context.Context, initialServerCount int, isAPIService
schedulers.Register()
config := newClusterConfig(initialServerCount)
servers := make(map[string]*TestServer)
for _, conf := range config.InitialServers {
serverConf, err := conf.Generate(opts...)
for _, cfg := range config.InitialServers {
serverConf, err := cfg.Generate(opts...)
if err != nil {
return nil, err
}
Expand All @@ -466,7 +466,7 @@ func createTestCluster(ctx context.Context, initialServerCount int, isAPIService
if err != nil {
return nil, err
}
servers[conf.Name] = s
servers[cfg.Name] = s
}
return &TestCluster{
config: config,
Expand All @@ -480,6 +480,68 @@ func createTestCluster(ctx context.Context, initialServerCount int, isAPIService
}, nil
}

// RestartTestAPICluster restarts the API test cluster.
func RestartTestAPICluster(ctx context.Context, cluster *TestCluster) (*TestCluster, error) {
return restartTestCluster(ctx, cluster, true)
}

func restartTestCluster(
ctx context.Context, cluster *TestCluster, isAPIServiceMode bool,
) (newTestCluster *TestCluster, err error) {
schedulers.Register()
newTestCluster = &TestCluster{
config: cluster.config,
servers: make(map[string]*TestServer, len(cluster.servers)),
tsPool: struct {
sync.Mutex
pool map[uint64]struct{}
}{
pool: make(map[uint64]struct{}),
},
}

var serverMap sync.Map
var errorMap sync.Map
wg := sync.WaitGroup{}
for serverName, server := range newTestCluster.servers {
serverCfg := server.GetConfig()
wg.Add(1)
go func(serverName string, server *TestServer) {
defer wg.Done()
server.Destroy()
var (
newServer *TestServer
serverErr error
)
if isAPIServiceMode {
newServer, serverErr = NewTestAPIServer(ctx, serverCfg)
} else {
newServer, serverErr = NewTestServer(ctx, serverCfg)
}
serverMap.Store(serverName, newServer)
errorMap.Store(serverName, serverErr)
}(serverName, server)
}
wg.Wait()

errorMap.Range(func(key, value interface{}) bool {
if value != nil {
err = value.(error)
return false
}
serverName := key.(string)
newServer, _ := serverMap.Load(serverName)
newTestCluster.servers[serverName] = newServer.(*TestServer)
return true
})

if err != nil {
return nil, errors.New("failed to restart cluster. " + err.Error())
}

return newTestCluster, nil
}

// RunServer starts to run TestServer.
func (c *TestCluster) RunServer(server *TestServer) <-chan error {
resC := make(chan error)
Expand Down
51 changes: 51 additions & 0 deletions tests/integrations/mcs/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package mcs
import (
"context"
"fmt"
"sync"
"time"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -53,6 +54,56 @@ func NewTestTSOCluster(ctx context.Context, initialServerCount int, backendEndpo
return tc, nil
}

// RestartTestTSOCluster restarts the TSO test cluster.
func RestartTestTSOCluster(
ctx context.Context, cluster *TestTSOCluster,
) (newCluster *TestTSOCluster, err error) {
newCluster = &TestTSOCluster{
ctx: ctx,
backendEndpoints: cluster.backendEndpoints,
servers: make(map[string]*tso.Server, len(cluster.servers)),
cleanupFuncs: make(map[string]testutil.CleanupFunc, len(cluster.servers)),
}
var (
serverMap sync.Map
cleanupMap sync.Map
errorMap sync.Map
)
wg := sync.WaitGroup{}
for addr, cleanup := range cluster.cleanupFuncs {
wg.Add(1)
go func(addr string, clean testutil.CleanupFunc) {
defer wg.Done()
clean()
serverCfg := cluster.servers[addr].GetConfig()
newServer, newCleanup, err := NewTSOTestServer(newCluster.ctx, serverCfg)
serverMap.Store(addr, newServer)
cleanupMap.Store(addr, newCleanup)
errorMap.Store(addr, err)
}(addr, cleanup)
}
wg.Wait()

errorMap.Range(func(key, value interface{}) bool {
if value != nil {
err = value.(error)
return false
}
addr := key.(string)
newServer, _ := serverMap.Load(addr)
newCleanup, _ := cleanupMap.Load(addr)
newCluster.servers[addr] = newServer.(*tso.Server)
newCluster.cleanupFuncs[addr] = newCleanup.(testutil.CleanupFunc)
return true
})

if err != nil {
return nil, fmt.Errorf("failed to restart the cluster." + err.Error())
}

return newCluster, nil
}

// AddServer adds a new TSO server to the test cluster.
func (tc *TestTSOCluster) AddServer(addr string) error {
cfg := tso.NewConfig()
Expand Down
47 changes: 47 additions & 0 deletions tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,12 +451,59 @@ func TestMixedTSODeployment(t *testing.T) {
wg.Wait()
}

// TestUpgradingAPIandTSOClusters tests the scenario that after we restart the API cluster
// then restart the TSO cluster, the TSO service can still serve TSO requests normally.
func TestUpgradingAPIandTSOClusters(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())

// Create an API cluster which has 3 servers
apiCluster, err := tests.NewTestAPICluster(ctx, 3)
re.NoError(err)
err = apiCluster.RunInitialServers()
re.NoError(err)
leaderName := apiCluster.WaitLeader()
pdLeader := apiCluster.GetServer(leaderName)
backendEndpoints := pdLeader.GetAddr()

// Create a pd client in PD mode to let the API leader to forward requests to the TSO cluster.
re.NoError(failpoint.Enable("github.com/tikv/pd/client/usePDServiceMode", "return(true)"))
pdClient, err := pd.NewClientWithContext(context.Background(),
[]string{backendEndpoints}, pd.SecurityOption{}, pd.WithMaxErrorRetry(1))
re.NoError(err)

// Create a TSO cluster which has 2 servers
tsoCluster, err := mcs.NewTestTSOCluster(ctx, 2, backendEndpoints)
re.NoError(err)
tsoCluster.WaitForDefaultPrimaryServing(re)
// The TSO service should be eventually healthy
mcs.WaitForTSOServiceAvailable(ctx, re, pdClient)

// Restart the API cluster
apiCluster, err = tests.RestartTestAPICluster(ctx, apiCluster)
re.NoError(err)
// The TSO service should be eventually healthy
mcs.WaitForTSOServiceAvailable(ctx, re, pdClient)

// Restart the TSO cluster
tsoCluster, err = mcs.RestartTestTSOCluster(ctx, tsoCluster)
re.NoError(err)
// The TSO service should be eventually healthy
mcs.WaitForTSOServiceAvailable(ctx, re, pdClient)

tsoCluster.Destroy()
apiCluster.Destroy()
cancel()
re.NoError(failpoint.Disable("github.com/tikv/pd/client/usePDServiceMode"))
}

func checkTSO(ctx context.Context, re *require.Assertions, wg *sync.WaitGroup, backendEndpoints string) {
wg.Add(tsoRequestConcurrencyNumber)
for i := 0; i < tsoRequestConcurrencyNumber; i++ {
go func() {
defer wg.Done()
cli := mcs.SetupClientWithAPIContext(ctx, re, pd.NewAPIContextV1(), strings.Split(backendEndpoints, ","))
defer cli.Close()
var ts, lastTS uint64
for {
select {
Expand Down