From dcfd9ba11f1ff17fdab87761ab88ab68d7783a85 Mon Sep 17 00:00:00 2001 From: Bin Shi <39923490+binshi-bing@users.noreply.github.com> Date: Thu, 27 Apr 2023 22:41:53 -0700 Subject: [PATCH 1/8] tests: fix Flaky TestMicroserviceTSOServer/TestConcurrentlyReset (#6396) close tikv/pd#6385 Get a copy of now then call base.add, because now is shared by all goroutines and now.add() will add to itself which isn't atomic and multi-goroutine safe. Signed-off-by: Bin Shi --- tests/integrations/tso/server_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/integrations/tso/server_test.go b/tests/integrations/tso/server_test.go index c9c978e7d7f..96fc5d334b0 100644 --- a/tests/integrations/tso/server_test.go +++ b/tests/integrations/tso/server_test.go @@ -154,7 +154,10 @@ func (suite *tsoServerTestSuite) TestConcurrentlyReset() { go func() { defer wg.Done() for j := 0; j <= 100; j++ { - physical := now.Add(time.Duration(2*j)*time.Minute).UnixNano() / int64(time.Millisecond) + // Get a copy of now then call base.add, because now is shared by all goroutines + // and now.add() will add to itself which isn't atomic and multi-goroutine safe. + base := now + physical := base.Add(time.Duration(2*j)*time.Minute).UnixNano() / int64(time.Millisecond) ts := uint64(physical << 18) suite.resetTS(ts, false, false) } From 87c49843942824d62320f4179a8ae8793369429b Mon Sep 17 00:00:00 2001 From: Bin Shi <39923490+binshi-bing@users.noreply.github.com> Date: Thu, 27 Apr 2023 23:21:52 -0700 Subject: [PATCH 2/8] keyspace, slice: improve code efficiency in membership ops (#6392) ref tikv/pd#6231 Improve code efficiency in membership ops Signed-off-by: Bin Shi --- pkg/keyspace/tso_keyspace_group.go | 26 +++++++++++++------ pkg/slice/slice.go | 11 ++++---- pkg/slice/slice_test.go | 25 +++++++++++++++++- .../mcs/keyspace/tso_keyspace_group_test.go | 4 +-- 4 files changed, 50 insertions(+), 16 deletions(-) diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index 36b45f885d6..9ea65e92b44 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -516,21 +516,30 @@ func (m *GroupManager) updateKeyspaceForGroupLocked(userKind endpoint.UserKind, if kg.IsSplitting() { return ErrKeyspaceGroupInSplit } + + changed := false + switch mutation { case opAdd: if !slice.Contains(kg.Keyspaces, keyspaceID) { kg.Keyspaces = append(kg.Keyspaces, keyspaceID) + changed = true } case opDelete: - if slice.Contains(kg.Keyspaces, keyspaceID) { - kg.Keyspaces = slice.Remove(kg.Keyspaces, keyspaceID) + lenOfKeyspaces := len(kg.Keyspaces) + kg.Keyspaces = slice.Remove(kg.Keyspaces, keyspaceID) + if lenOfKeyspaces != len(kg.Keyspaces) { + changed = true } } - if err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{kg}, true); err != nil { - return err - } - m.groups[userKind].Put(kg) + if changed { + if err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{kg}, true); err != nil { + return err + } + + m.groups[userKind].Put(kg) + } return nil } @@ -569,8 +578,9 @@ func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUse updateNew = true } - if slice.Contains(oldKG.Keyspaces, keyspaceID) { - oldKG.Keyspaces = slice.Remove(oldKG.Keyspaces, keyspaceID) + lenOfOldKeyspaces := len(oldKG.Keyspaces) + oldKG.Keyspaces = slice.Remove(oldKG.Keyspaces, keyspaceID) + if lenOfOldKeyspaces != len(oldKG.Keyspaces) { updateOld = true } diff --git a/pkg/slice/slice.go b/pkg/slice/slice.go index 3400a639666..b3741593670 100644 --- a/pkg/slice/slice.go +++ b/pkg/slice/slice.go @@ -49,11 +49,12 @@ func Contains[T comparable](slice []T, value T) bool { // Remove removes the value from the slice. func Remove[T comparable](slice []T, value T) []T { - for i := 0; i < len(slice); i++ { - if slice[i] == value { - slice = append(slice[:i], slice[i+1:]...) - i-- + i, j := 0, 0 + for ; i < len(slice); i++ { + if slice[i] != value { + slice[j] = slice[i] + j++ } } - return slice + return slice[:j] } diff --git a/pkg/slice/slice_test.go b/pkg/slice/slice_test.go index f89f8a50546..1fe3fe79dcf 100644 --- a/pkg/slice/slice_test.go +++ b/pkg/slice/slice_test.go @@ -60,7 +60,7 @@ func TestSliceContains(t *testing.T) { re.False(slice.Contains(is, int64(4))) } -func TestSliceRemove(t *testing.T) { +func TestSliceRemoveGenericTypes(t *testing.T) { t.Parallel() re := require.New(t) ss := []string{"a", "b", "c"} @@ -75,3 +75,26 @@ func TestSliceRemove(t *testing.T) { is = slice.Remove(is, 1) re.Equal([]int64{2, 3}, is) } + +func TestSliceRemove(t *testing.T) { + t.Parallel() + re := require.New(t) + + is := []int64{} + is = slice.Remove(is, 1) + re.Equal([]int64{}, is) + + is = []int64{1} + is = slice.Remove(is, 2) + re.Equal([]int64{1}, is) + is = slice.Remove(is, 1) + re.Equal([]int64{}, is) + + is = []int64{1, 2, 3} + is = slice.Remove(is, 1) + re.Equal([]int64{2, 3}, is) + + is = []int64{1, 1, 1} + is = slice.Remove(is, 1) + re.Equal([]int64{}, is) +} diff --git a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go index 91186ca8211..6d2c51e1dd0 100644 --- a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go +++ b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go @@ -82,7 +82,7 @@ func (suite *keyspaceGroupTestSuite) TearDownTest() { func (suite *keyspaceGroupTestSuite) TestAllocNodesUpdate() { // add three nodes. nodes := make(map[string]bs.Server) - for i := 0; i < utils.KeyspaceGroupDefaultReplicaCount+2; i++ { + for i := 0; i < utils.KeyspaceGroupDefaultReplicaCount+1; i++ { s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) defer cleanup() nodes[s.GetAddr()] = s @@ -113,7 +113,7 @@ func (suite *keyspaceGroupTestSuite) TestAllocNodesUpdate() { oldMembers[member.Address] = struct{}{} } - // alloc node update to 2. + // alloc node update to 3. params.Replica = utils.KeyspaceGroupDefaultReplicaCount + 1 got, code = suite.tryAllocNodesForKeyspaceGroup(id, params) suite.Equal(http.StatusOK, code) From a0613d83df32e0cdd24e5aa540490c5cd58eaf93 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Fri, 28 Apr 2023 15:51:53 +0800 Subject: [PATCH 3/8] tests: enable TestTSOKeyspaceGroupSplitClient (#6398) ref tikv/pd#6232 Enable `TestTSOKeyspaceGroupSplitClient`. Signed-off-by: JmPotato Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- .../mcs/tso/keyspace_group_manager_test.go | 27 ++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 38000c257ce..3cc48e682b9 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -16,6 +16,7 @@ package tso import ( "context" + "strings" "sync" "testing" "time" @@ -251,9 +252,6 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitElection } func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient() { - // TODO: remove the skip after the client is able to support multi-keyspace-group. - suite.T().SkipNow() - re := suite.Require() // Create the keyspace group 1 with keyspaces [111, 222, 333]. handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ @@ -287,13 +285,27 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient() for { select { case <-ctx.Done(): + // Make sure at least one TSO request is successful. + re.NotEmpty(lastPhysical) return default: } physical, logical, err := tsoClient.GetTS(ctx) - re.NoError(err) - re.Greater(physical, lastPhysical) - re.Greater(logical, lastLogical) + if err != nil { + errMsg := err.Error() + // Ignore the errors caused by the split and context cancellation. + if strings.Contains(errMsg, "context canceled") || + strings.Contains(errMsg, "not leader") || + strings.Contains(errMsg, "ErrKeyspaceNotAssigned") { + continue + } + re.FailNow(errMsg) + } + if physical == lastPhysical { + re.Greater(logical, lastLogical) + } else { + re.Greater(physical, lastPhysical) + } lastPhysical, lastLogical = physical, logical } }() @@ -308,6 +320,9 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient() re.True(kg2.IsSplitTarget()) // Finish the split. handlersutil.MustFinishSplitKeyspaceGroup(re, suite.pdLeaderServer, 2) + // Wait for a while to make sure the client has received the new TSO. + time.Sleep(time.Second) + // Stop the client. cancel() wg.Wait() } From a4f9af3444137f659cdd495534564312fd05a0d2 Mon Sep 17 00:00:00 2001 From: Bin Shi <39923490+binshi-bing@users.noreply.github.com> Date: Thu, 4 May 2023 16:15:56 +0700 Subject: [PATCH 4/8] tests: add more tests for multiple keyspace groups (#6395) ref tikv/pd#5895 Add CheckMultiKeyspacesTSO() and WaitForMultiKeyspacesTSOAvailable in test utility. Add TestTSOKeyspaceGroupManager/TestKeyspacesServedByNonDefaultKeyspaceGroup. Cover TestGetTS, TestGetTSAsync, TestUpdateAfterResetTSO in TestMicroserviceTSOClient for multiple keyspace groups. Signed-off-by: Bin Shi --- pkg/mcs/tso/server/server.go | 7 +- pkg/tso/keyspace_group_manager.go | 2 +- tests/integrations/mcs/cluster.go | 13 +- tests/integrations/mcs/testutil.go | 87 ++++++- .../mcs/tso/keyspace_group_manager_test.go | 79 ++++++- tests/integrations/mcs/tso/server_test.go | 23 +- tests/integrations/tso/client_test.go | 222 +++++++++++------- tests/server/tso/common_test.go | 2 +- 8 files changed, 327 insertions(+), 108 deletions(-) diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index d9866b7a9db..25a832c5f48 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -244,10 +244,9 @@ func (s *Server) GetMember(keyspaceID, keyspaceGroupID uint32) (tso.ElectionMemb return member, nil } -// ResignPrimary resigns the primary of the given keyspace and keyspace group. -func (s *Server) ResignPrimary() error { - member, err := s.keyspaceGroupManager.GetElectionMember( - mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID) +// ResignPrimary resigns the primary of the given keyspace. +func (s *Server) ResignPrimary(keyspaceID, keyspaceGroupID uint32) error { + member, err := s.keyspaceGroupManager.GetElectionMember(keyspaceID, keyspaceGroupID) if err != nil { return err } diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 4c9e8dac4ca..0089e0d9bdc 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -812,7 +812,7 @@ func (kgm *KeyspaceGroupManager) FindGroupByKeyspaceID( return curAM, curKeyspaceGroup, curKeyspaceGroupID, nil } -// GetElectionMember returns the election member of the given keyspace group +// GetElectionMember returns the election member of the keyspace group serving the given keyspace. func (kgm *KeyspaceGroupManager) GetElectionMember( keyspaceID, keyspaceGroupID uint32, ) (ElectionMember, error) { diff --git a/tests/integrations/mcs/cluster.go b/tests/integrations/mcs/cluster.go index 228f506454d..dbc9964b62b 100644 --- a/tests/integrations/mcs/cluster.go +++ b/tests/integrations/mcs/cluster.go @@ -16,6 +16,7 @@ package mcs import ( "context" + "fmt" "time" "github.com/stretchr/testify/require" @@ -92,12 +93,16 @@ func (tc *TestTSOCluster) DestroyServer(addr string) { } // ResignPrimary resigns the primary TSO server. -func (tc *TestTSOCluster) ResignPrimary() { - tc.GetPrimary(mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID).ResignPrimary() +func (tc *TestTSOCluster) ResignPrimary(keyspaceID, keyspaceGroupID uint32) error { + primaryServer := tc.GetPrimaryServer(keyspaceID, keyspaceGroupID) + if primaryServer == nil { + return fmt.Errorf("no tso server serves this keyspace %d", keyspaceID) + } + return primaryServer.ResignPrimary(keyspaceID, keyspaceGroupID) } -// GetPrimary returns the primary TSO server. -func (tc *TestTSOCluster) GetPrimary(keyspaceID, keyspaceGroupID uint32) *tso.Server { +// GetPrimaryServer returns the primary TSO server of the given keyspace +func (tc *TestTSOCluster) GetPrimaryServer(keyspaceID, keyspaceGroupID uint32) *tso.Server { for _, server := range tc.servers { if server.IsKeyspaceServing(keyspaceID, keyspaceGroupID) { return server diff --git a/tests/integrations/mcs/testutil.go b/tests/integrations/mcs/testutil.go index f9c47aa56f0..3ca1ad39436 100644 --- a/tests/integrations/mcs/testutil.go +++ b/tests/integrations/mcs/testutil.go @@ -29,6 +29,7 @@ import ( tso "github.com/tikv/pd/pkg/mcs/tso/server" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/pkg/utils/tsoutil" ) var once sync.Once @@ -48,13 +49,25 @@ func InitLogger(cfg *tso.Config) (err error) { return err } -// SetupClientWithKeyspace creates a TSO client for test. -func SetupClientWithKeyspace(ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption) pd.Client { +// SetupClientWithDefaultKeyspaceName creates a TSO client with default keyspace name for test. +func SetupClientWithDefaultKeyspaceName( + ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption, +) pd.Client { cli, err := pd.NewClientWithKeyspaceName(ctx, "", endpoints, pd.SecurityOption{}, opts...) re.NoError(err) return cli } +// SetupClientWithKeyspaceID creates a TSO client with the given keyspace id for test. +func SetupClientWithKeyspaceID( + ctx context.Context, re *require.Assertions, + keyspaceID uint32, endpoints []string, opts ...pd.ClientOption, +) pd.Client { + cli, err := pd.NewClientWithKeyspace(ctx, keyspaceID, endpoints, pd.SecurityOption{}, opts...) + re.NoError(err) + return cli +} + // StartSingleResourceManagerTestServer creates and starts a resource manager server with default config for testing. func StartSingleResourceManagerTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*rm.Server, func()) { cfg := rm.NewConfig() @@ -137,3 +150,73 @@ func WaitForTSOServiceAvailable(ctx context.Context, pdClient pd.Client) error { } return errors.WithStack(err) } + +// CheckMultiKeyspacesTSO checks the correctness of TSO for multiple keyspaces. +func CheckMultiKeyspacesTSO( + ctx context.Context, re *require.Assertions, + clients []pd.Client, parallelAct func(), +) { + ctx, cancel := context.WithCancel(ctx) + wg := sync.WaitGroup{} + wg.Add(len(clients)) + + for _, client := range clients { + go func(cli pd.Client) { + defer wg.Done() + var ts, lastTS uint64 + for { + select { + case <-ctx.Done(): + // Make sure the lastTS is not empty + re.NotEmpty(lastTS) + return + default: + } + physical, logical, err := cli.GetTS(ctx) + // omit the error check since there are many kinds of errors + if err != nil { + continue + } + ts = tsoutil.ComposeTS(physical, logical) + re.Less(lastTS, ts) + lastTS = ts + } + }(client) + } + + wg.Add(1) + go func() { + defer wg.Done() + parallelAct() + cancel() + }() + + wg.Wait() +} + +// WaitForMultiKeyspacesTSOAvailable waits for the given keyspaces being served by the tso server side +func WaitForMultiKeyspacesTSOAvailable( + ctx context.Context, re *require.Assertions, + keyspaceIDs []uint32, backendEndpoints []string, +) []pd.Client { + wg := sync.WaitGroup{} + wg.Add(len(keyspaceIDs)) + + clients := make([]pd.Client, 0, len(keyspaceIDs)) + for _, keyspaceID := range keyspaceIDs { + cli := SetupClientWithKeyspaceID(ctx, re, keyspaceID, backendEndpoints) + re.NotNil(cli) + clients = append(clients, cli) + + go func() { + defer wg.Done() + testutil.Eventually(re, func() bool { + _, _, err := cli.GetTS(ctx) + return err == nil + }) + }() + } + + wg.Wait() + return clients +} diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 3cc48e682b9..60cc8c1161d 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -83,7 +83,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TearDownTest() { } func cleanupKeyspaceGroups(re *require.Assertions, server *tests.TestServer) { - for _, group := range handlersutil.MustLoadKeyspaceGroups(re, server, "0", "0") { + keyspaceGroups := handlersutil.MustLoadKeyspaceGroups(re, server, "0", "0") + for _, group := range keyspaceGroups { // Do not delete default keyspace group. if group.ID == mcsutils.DefaultKeyspaceGroupID { continue @@ -130,6 +131,80 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByDefaultKeysp } } } + + keyspaceIDs := []uint32{0, 1, 2, 3, 1000} + clients := mcs.WaitForMultiKeyspacesTSOAvailable( + suite.ctx, re, keyspaceIDs, []string{suite.pdLeaderServer.GetAddr()}) + re.Equal(len(keyspaceIDs), len(clients)) + mcs.CheckMultiKeyspacesTSO(suite.ctx, re, clients, func() { + time.Sleep(3 * time.Second) + }) +} + +func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKeyspaceGroups() { + // Create multiple keyspace groups, and every keyspace should be served by one of them + // on a tso server. + re := suite.Require() + + params := []struct { + keyspaceGroupID uint32 + keyspaceIDs []uint32 + }{ + {0, []uint32{0, 10}}, + {1, []uint32{1, 11}}, + {2, []uint32{2, 12}}, + } + + for _, param := range params { + if param.keyspaceGroupID == 0 { + // we have already created default keyspace group, so we can skip it. + // keyspace 10 isn't assigned to any keyspace group, so they will be + // served by default keyspace group. + continue + } + handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ + KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: param.keyspaceGroupID, + UserKind: endpoint.Standard.String(), + Members: suite.tsoCluster.GetKeyspaceGroupMember(), + Keyspaces: param.keyspaceIDs, + }, + }, + }) + } + + testutil.Eventually(re, func() bool { + for _, param := range params { + for _, keyspaceID := range param.keyspaceIDs { + served := false + for _, server := range suite.tsoCluster.GetServers() { + if server.IsKeyspaceServing(keyspaceID, param.keyspaceGroupID) { + tam, err := server.GetTSOAllocatorManager(param.keyspaceGroupID) + re.NoError(err) + re.NotNil(tam) + served = true + } + } + if !served { + return false + } + } + } + return true + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + + keyspaceIDs := make([]uint32, 0) + for _, param := range params { + keyspaceIDs = append(keyspaceIDs, param.keyspaceIDs...) + } + + clients := mcs.WaitForMultiKeyspacesTSOAvailable( + suite.ctx, re, keyspaceIDs, []string{suite.pdLeaderServer.GetAddr()}) + re.Equal(len(keyspaceIDs), len(clients)) + mcs.CheckMultiKeyspacesTSO(suite.ctx, re, clients, func() { + time.Sleep(3 * time.Second) + }) } func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { @@ -160,7 +235,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { }) ts.Physical += time.Hour.Milliseconds() // Set the TSO of the keyspace group 1 to a large value. - err = suite.tsoCluster.GetPrimary(222, 1).GetHandler().ResetTS(tsoutil.GenerateTS(&ts), false, true, 1) + err = suite.tsoCluster.GetPrimaryServer(222, 1).GetHandler().ResetTS(tsoutil.GenerateTS(&ts), false, true, 1) re.NoError(err) // Split the keyspace group 1 to 2. handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{ diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index da23ab1d1eb..d074c49a497 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -186,7 +186,7 @@ func checkTSOPath(re *require.Assertions, isAPIServiceMode bool) { _, cleanup := mcs.StartSingleTSOTestServer(ctx, re, backendEndpoints, tempurl.Alloc()) defer cleanup() - cli := mcs.SetupClientWithKeyspace(ctx, re, []string{backendEndpoints}) + cli := mcs.SetupClientWithDefaultKeyspaceName(ctx, re, []string{backendEndpoints}) physical, logical, err := cli.GetTS(ctx) re.NoError(err) ts := tsoutil.ComposeTS(physical, logical) @@ -349,13 +349,14 @@ func (suite *APIServerForwardTestSuite) checkAvailableTSO() { type CommonTestSuite struct { suite.Suite - ctx context.Context - cancel context.CancelFunc - cluster *tests.TestCluster - tsoCluster *mcs.TestTSOCluster - pdLeader *tests.TestServer - tsoPrimary *tso.Server - backendEndpoints string + ctx context.Context + cancel context.CancelFunc + cluster *tests.TestCluster + tsoCluster *mcs.TestTSOCluster + pdLeader *tests.TestServer + // tsoDefaultPrimaryServer is the primary server of the default keyspace group + tsoDefaultPrimaryServer *tso.Server + backendEndpoints string } func TestCommonTestSuite(t *testing.T) { @@ -380,7 +381,7 @@ func (suite *CommonTestSuite) SetupSuite() { suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 1, suite.backendEndpoints) suite.NoError(err) suite.tsoCluster.WaitForDefaultPrimaryServing(re) - suite.tsoPrimary = suite.tsoCluster.GetPrimary(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID) + suite.tsoDefaultPrimaryServer = suite.tsoCluster.GetPrimaryServer(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID) } func (suite *CommonTestSuite) TearDownSuite() { @@ -401,14 +402,14 @@ func (suite *CommonTestSuite) TearDownSuite() { func (suite *CommonTestSuite) TestAdvertiseAddr() { re := suite.Require() - conf := suite.tsoPrimary.GetConfig() + conf := suite.tsoDefaultPrimaryServer.GetConfig() re.Equal(conf.GetListenAddr(), conf.GetAdvertiseListenAddr()) } func (suite *CommonTestSuite) TestMetrics() { re := suite.Require() - resp, err := http.Get(suite.tsoPrimary.GetConfig().GetAdvertiseListenAddr() + "/metrics") + resp, err := http.Get(suite.tsoDefaultPrimaryServer.GetConfig().GetAdvertiseListenAddr() + "/metrics") re.NoError(err) defer resp.Body.Close() re.Equal(http.StatusOK, resp.StatusCode) diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 2d41aad4b84..80518798179 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -28,10 +28,14 @@ import ( "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/testutil" + mcsutils "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/tsoutil" + "github.com/tikv/pd/server/apiv2/handlers" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/integrations/mcs" + handlersutil "github.com/tikv/pd/tests/server/apiv2/handlers" ) var r = rand.New(rand.NewSource(time.Now().UnixNano())) @@ -44,12 +48,14 @@ type tsoClientTestSuite struct { cancel context.CancelFunc // The PD cluster. cluster *tests.TestCluster + // pdLeaderServer is the leader server of the PD cluster. + pdLeaderServer *tests.TestServer // The TSO service in microservice mode. tsoCluster *mcs.TestTSOCluster backendEndpoints string - - client pd.TSOClient + keyspaceIDs []uint32 + clients []pd.Client } func TestLegacyTSOClient(t *testing.T) { @@ -78,16 +84,56 @@ func (suite *tsoClientTestSuite) SetupSuite() { err = suite.cluster.RunInitialServers() re.NoError(err) leaderName := suite.cluster.WaitLeader() - pdLeader := suite.cluster.GetServer(leaderName) - re.NoError(pdLeader.BootstrapCluster()) - suite.backendEndpoints = pdLeader.GetAddr() + suite.pdLeaderServer = suite.cluster.GetServer(leaderName) + re.NoError(suite.pdLeaderServer.BootstrapCluster()) + suite.backendEndpoints = suite.pdLeaderServer.GetAddr() + suite.keyspaceIDs = make([]uint32, 0) + if suite.legacy { - suite.client, err = pd.NewClientWithContext(suite.ctx, strings.Split(suite.backendEndpoints, ","), pd.SecurityOption{}) + client, err := pd.NewClientWithContext(suite.ctx, strings.Split(suite.backendEndpoints, ","), pd.SecurityOption{}) re.NoError(err) + suite.keyspaceIDs = append(suite.keyspaceIDs, 0) + suite.clients = make([]pd.Client, 0) + suite.clients = append(suite.clients, client) } else { suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 3, suite.backendEndpoints) re.NoError(err) - suite.client = mcs.SetupClientWithKeyspace(suite.ctx, re, strings.Split(suite.backendEndpoints, ",")) + + params := []struct { + keyspaceGroupID uint32 + keyspaceIDs []uint32 + }{ + {0, []uint32{0, 10}}, + {1, []uint32{1, 11}}, + {2, []uint32{2}}, + } + + for _, param := range params { + if param.keyspaceGroupID == 0 { + // we have already created default keyspace group, so we can skip it. + // keyspace 10 isn't assigned to any keyspace group, so they will be + // served by default keyspace group. + continue + } + handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ + KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: param.keyspaceGroupID, + UserKind: endpoint.Standard.String(), + Members: suite.tsoCluster.GetKeyspaceGroupMember(), + Keyspaces: param.keyspaceIDs, + }, + }, + }) + } + + for _, param := range params { + suite.keyspaceIDs = append(suite.keyspaceIDs, param.keyspaceIDs...) + } + + suite.clients = mcs.WaitForMultiKeyspacesTSOAvailable( + suite.ctx, re, suite.keyspaceIDs, strings.Split(suite.backendEndpoints, ",")) + re.Equal(len(suite.keyspaceIDs), len(suite.clients)) } } @@ -101,42 +147,46 @@ func (suite *tsoClientTestSuite) TearDownSuite() { func (suite *tsoClientTestSuite) TestGetTS() { var wg sync.WaitGroup - wg.Add(tsoRequestConcurrencyNumber) + wg.Add(tsoRequestConcurrencyNumber * len(suite.clients)) for i := 0; i < tsoRequestConcurrencyNumber; i++ { - go func() { - defer wg.Done() - var lastTS uint64 - for i := 0; i < tsoRequestRound; i++ { - physical, logical, err := suite.client.GetTS(suite.ctx) - suite.NoError(err) - ts := tsoutil.ComposeTS(physical, logical) - suite.Less(lastTS, ts) - lastTS = ts - } - }() + for _, client := range suite.clients { + go func(client pd.Client) { + defer wg.Done() + var lastTS uint64 + for j := 0; j < tsoRequestRound; j++ { + physical, logical, err := client.GetTS(suite.ctx) + suite.NoError(err) + ts := tsoutil.ComposeTS(physical, logical) + suite.Less(lastTS, ts) + lastTS = ts + } + }(client) + } } wg.Wait() } func (suite *tsoClientTestSuite) TestGetTSAsync() { var wg sync.WaitGroup - wg.Add(tsoRequestConcurrencyNumber) + wg.Add(tsoRequestConcurrencyNumber * len(suite.clients)) for i := 0; i < tsoRequestConcurrencyNumber; i++ { - go func() { - defer wg.Done() - tsFutures := make([]pd.TSFuture, tsoRequestRound) - for i := range tsFutures { - tsFutures[i] = suite.client.GetTSAsync(suite.ctx) - } - var lastTS uint64 = math.MaxUint64 - for i := len(tsFutures) - 1; i >= 0; i-- { - physical, logical, err := tsFutures[i].Wait() - suite.NoError(err) - ts := tsoutil.ComposeTS(physical, logical) - suite.Greater(lastTS, ts) - lastTS = ts - } - }() + for _, client := range suite.clients { + go func(client pd.Client) { + defer wg.Done() + tsFutures := make([]pd.TSFuture, tsoRequestRound) + for j := range tsFutures { + tsFutures[j] = client.GetTSAsync(suite.ctx) + } + var lastTS uint64 = math.MaxUint64 + for j := len(tsFutures) - 1; j >= 0; j-- { + physical, logical, err := tsFutures[j].Wait() + suite.NoError(err) + ts := tsoutil.ComposeTS(physical, logical) + suite.Greater(lastTS, ts) + lastTS = ts + } + }(client) + } } wg.Wait() } @@ -147,33 +197,36 @@ func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() { ctx, cancel := context.WithCancel(suite.ctx) defer cancel() - testutil.Eventually(re, func() bool { - _, _, err := suite.client.GetTS(ctx) - return err == nil - }) - // Transfer leader to trigger the TSO resetting. - re.NoError(failpoint.Enable("github.com/tikv/pd/server/updateAfterResetTSO", "return(true)")) - oldLeaderName := suite.cluster.WaitLeader() - err := suite.cluster.GetServer(oldLeaderName).ResignLeader() - re.NoError(err) - re.NoError(failpoint.Disable("github.com/tikv/pd/server/updateAfterResetTSO")) - newLeaderName := suite.cluster.WaitLeader() - re.NotEqual(oldLeaderName, newLeaderName) - // Request a new TSO. - testutil.Eventually(re, func() bool { - _, _, err := suite.client.GetTS(ctx) - return err == nil - }) - // Transfer leader back. - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/delaySyncTimestamp", `return(true)`)) - err = suite.cluster.GetServer(newLeaderName).ResignLeader() - re.NoError(err) - // Should NOT panic here. - testutil.Eventually(re, func() bool { - _, _, err := suite.client.GetTS(ctx) - return err == nil - }) - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/delaySyncTimestamp")) + for i := 0; i < len(suite.clients); i++ { + client := suite.clients[i] + testutil.Eventually(re, func() bool { + _, _, err := client.GetTS(ctx) + return err == nil + }) + // Transfer leader to trigger the TSO resetting. + re.NoError(failpoint.Enable("github.com/tikv/pd/server/updateAfterResetTSO", "return(true)")) + oldLeaderName := suite.cluster.WaitLeader() + err := suite.cluster.GetServer(oldLeaderName).ResignLeader() + re.NoError(err) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/updateAfterResetTSO")) + newLeaderName := suite.cluster.WaitLeader() + re.NotEqual(oldLeaderName, newLeaderName) + // Request a new TSO. + testutil.Eventually(re, func() bool { + _, _, err := client.GetTS(ctx) + return err == nil + }) + // Transfer leader back. + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/delaySyncTimestamp", `return(true)`)) + err = suite.cluster.GetServer(newLeaderName).ResignLeader() + re.NoError(err) + // Should NOT panic here. + testutil.Eventually(re, func() bool { + _, _, err := client.GetTS(ctx) + return err == nil + }) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/delaySyncTimestamp")) + } } func (suite *tsoClientTestSuite) TestRandomResignLeader() { @@ -181,29 +234,37 @@ func (suite *tsoClientTestSuite) TestRandomResignLeader() { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) - ctx, cancel := context.WithCancel(suite.ctx) - var wg sync.WaitGroup - checkTSO(ctx, re, &wg, suite.backendEndpoints) - wg.Add(1) - go func() { - defer wg.Done() + parallelAct := func() { // After https://github.com/tikv/pd/issues/6376 is fixed, we can use a smaller number here. // currently, the time to discover tso service is usually a little longer than 1s, compared // to the previous time taken < 1s. n := r.Intn(2) + 3 time.Sleep(time.Duration(n) * time.Second) if !suite.legacy { - suite.tsoCluster.ResignPrimary() - suite.tsoCluster.WaitForDefaultPrimaryServing(re) + wg := sync.WaitGroup{} + // Select the default keyspace and a randomly picked keyspace to test + keyspaceIDs := []uint32{mcsutils.DefaultKeyspaceID} + selectIdx := uint32(r.Intn(len(suite.keyspaceIDs)-1) + 1) + keyspaceIDs = append(keyspaceIDs, suite.keyspaceIDs[selectIdx]) + wg.Add(len(keyspaceIDs)) + for _, keyspaceID := range keyspaceIDs { + go func(keyspaceID uint32) { + defer wg.Done() + err := suite.tsoCluster.ResignPrimary(keyspaceID, mcsutils.DefaultKeyspaceGroupID) + re.NoError(err) + suite.tsoCluster.WaitForPrimaryServing(re, keyspaceID, 0) + }(keyspaceID) + } + wg.Wait() } else { err := suite.cluster.ResignLeader() re.NoError(err) suite.cluster.WaitLeader() } time.Sleep(time.Duration(n) * time.Second) - cancel() - }() - wg.Wait() + } + + mcs.CheckMultiKeyspacesTSO(suite.ctx, re, suite.clients, parallelAct) } func (suite *tsoClientTestSuite) TestRandomShutdown() { @@ -211,12 +272,7 @@ func (suite *tsoClientTestSuite) TestRandomShutdown() { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) - ctx, cancel := context.WithCancel(suite.ctx) - var wg sync.WaitGroup - checkTSO(ctx, re, &wg, suite.backendEndpoints) - wg.Add(1) - go func() { - defer wg.Done() + parallelAct := func() { // After https://github.com/tikv/pd/issues/6376 is fixed, we can use a smaller number here. // currently, the time to discover tso service is usually a little longer than 1s, compared // to the previous time taken < 1s. @@ -228,9 +284,9 @@ func (suite *tsoClientTestSuite) TestRandomShutdown() { suite.cluster.GetServer(suite.cluster.GetLeader()).GetServer().Close() } time.Sleep(time.Duration(n) * time.Second) - cancel() - }() - wg.Wait() + } + + mcs.CheckMultiKeyspacesTSO(suite.ctx, re, suite.clients, parallelAct) suite.TearDownSuite() suite.SetupSuite() } @@ -286,7 +342,7 @@ func checkTSO(ctx context.Context, re *require.Assertions, wg *sync.WaitGroup, b for i := 0; i < tsoRequestConcurrencyNumber; i++ { go func() { defer wg.Done() - cli := mcs.SetupClientWithKeyspace(ctx, re, strings.Split(backendEndpoints, ",")) + cli := mcs.SetupClientWithDefaultKeyspaceName(ctx, re, strings.Split(backendEndpoints, ",")) var ts, lastTS uint64 for { select { diff --git a/tests/server/tso/common_test.go b/tests/server/tso/common_test.go index f528103db84..877fcb10982 100644 --- a/tests/server/tso/common_test.go +++ b/tests/server/tso/common_test.go @@ -28,7 +28,7 @@ import ( ) const ( - tsoRequestConcurrencyNumber = 5 + tsoRequestConcurrencyNumber = 3 tsoRequestRound = 30 tsoCount = 10 ) From 46fdd968d7d128194f31a3361503a8bf1c9cffba Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 4 May 2023 17:35:55 +0800 Subject: [PATCH 5/8] tests: fix failpoint disable (#6401) ref tikv/pd#4399 Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/schedule/operator_controller_test.go | 1 + pkg/utils/etcdutil/etcdutil_test.go | 6 +++--- tests/integrations/tso/client_test.go | 8 ++++---- tests/server/tso/tso_test.go | 2 +- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/pkg/schedule/operator_controller_test.go b/pkg/schedule/operator_controller_test.go index a8e162aead0..3e46688655a 100644 --- a/pkg/schedule/operator_controller_test.go +++ b/pkg/schedule/operator_controller_test.go @@ -306,6 +306,7 @@ func (suite *operatorControllerTestSuite) TestConcurrentRemoveOperator() { oc.SetOperator(op1) suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/concurrentRemoveOperator", "return(true)")) + defer suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/concurrentRemoveOperator")) var wg sync.WaitGroup wg.Add(2) diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index e63761ca9a9..e8aa901bee0 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -241,7 +241,7 @@ func TestEtcdClientSync(t *testing.T) { re.Len(listResp3.Members, 1) re.Equal(uint64(etcd2.Server.ID()), listResp3.Members[0].ID) - require.NoError(t, failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval")) } func TestEtcdWithHangLeaderEnableCheck(t *testing.T) { @@ -251,13 +251,13 @@ func TestEtcdWithHangLeaderEnableCheck(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval", "return(true)")) err = checkEtcdWithHangLeader(t) re.NoError(err) - require.NoError(t, failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval")) // Test with disable check. re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/closeKeepAliveCheck", "return(true)")) err = checkEtcdWithHangLeader(t) re.Error(err) - require.NoError(t, failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/closeKeepAliveCheck")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/closeKeepAliveCheck")) } func TestEtcdScaleInAndOutWithoutMultiPoint(t *testing.T) { diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 80518798179..31258a23bd1 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -232,7 +232,7 @@ func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() { func (suite *tsoClientTestSuite) TestRandomResignLeader() { re := suite.Require() re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) - defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) + defer re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval")) parallelAct := func() { // After https://github.com/tikv/pd/issues/6376 is fixed, we can use a smaller number here. @@ -270,7 +270,7 @@ func (suite *tsoClientTestSuite) TestRandomResignLeader() { func (suite *tsoClientTestSuite) TestRandomShutdown() { re := suite.Require() re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) - defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) + defer re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval")) parallelAct := func() { // After https://github.com/tikv/pd/issues/6376 is fixed, we can use a smaller number here. @@ -296,9 +296,9 @@ func TestMixedTSODeployment(t *testing.T) { re := require.New(t) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) - defer re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval", "return(true)")) + defer re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval")) re.NoError(failpoint.Enable("github.com/tikv/pd/client/skipUpdateServiceMode", "return(true)")) - defer re.NoError(failpoint.Enable("github.com/tikv/pd/client/skipUpdateServiceMode", "return(true)")) + defer re.NoError(failpoint.Disable("github.com/tikv/pd/client/skipUpdateServiceMode")) ctx, cancel := context.WithCancel(context.Background()) cluster, err := tests.NewTestCluster(ctx, 1) diff --git a/tests/server/tso/tso_test.go b/tests/server/tso/tso_test.go index 7b318ad7d36..48df02a6c27 100644 --- a/tests/server/tso/tso_test.go +++ b/tests/server/tso/tso_test.go @@ -70,7 +70,7 @@ func TestLoadTimestamp(t *testing.T) { re.Greater(newTS.GetPhysical()-lastTS.GetPhysical(), int64(0)) } - failpoint.Disable("github.com/tikv/pd/pkg/tso/systemTimeSlow") + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/systemTimeSlow")) } func requestLocalTSOs(re *require.Assertions, cluster *tests.TestCluster, dcLocationConfig map[string]string) map[string]*pdpb.Timestamp { From 566beda84936fb1b11b7230c862984db43709501 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=90=86=E5=B7=A5=E7=94=B7?= Date: Fri, 5 May 2023 13:29:55 +0800 Subject: [PATCH 6/8] election: update leadership_test.go (#6400) close tikv/pd#6399 fix test code Signed-off-by: HappyUncle --- pkg/election/leadership_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/election/leadership_test.go b/pkg/election/leadership_test.go index 2857c89a881..63b25378518 100644 --- a/pkg/election/leadership_test.go +++ b/pkg/election/leadership_test.go @@ -110,7 +110,7 @@ func TestLeadership(t *testing.T) { // Check the lease. lease1 := leadership1.getLease() re.NotNil(lease1) - lease2 := leadership1.getLease() + lease2 := leadership2.getLease() re.NotNil(lease2) re.True(lease1.IsExpired()) From 62b51dd7f998d0206809624b45f3268151f33b03 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 5 May 2023 15:57:56 +0800 Subject: [PATCH 7/8] client: retry load keyspace meta when creating a new client (#6402) ref tikv/pd#5895 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/client.go | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/client/client.go b/client/client.go index b3885c53791..d9cb0358f3f 100644 --- a/client/client.go +++ b/client/client.go @@ -383,13 +383,35 @@ func NewClientWithKeyspaceName(ctx context.Context, keyspace string, svrAddrs [] c.cancel() return nil, err } + if err := c.initRetry(c.loadKeyspaceMeta, keyspace); err != nil { + return nil, err + } + return c, nil +} + +func (c *client) initRetry(f func(s string) error, str string) error { + var err error + for i := 0; i < c.option.maxRetryTimes; i++ { + if err = f(str); err == nil || strings.Contains(err.Error(), "ENTRY_NOT_FOUND") { + return nil + } + select { + case <-c.ctx.Done(): + return err + case <-time.After(time.Second): + } + } + return errors.WithStack(err) +} + +func (c *client) loadKeyspaceMeta(keyspace string) error { keyspaceMeta, err := c.LoadKeyspace(context.TODO(), keyspace) // Here we ignore ENTRY_NOT_FOUND error and it will set the keyspaceID to 0. if err != nil && !strings.Contains(err.Error(), "ENTRY_NOT_FOUND") { - return nil, err + return err } c.keyspaceID = keyspaceMeta.GetId() - return c, nil + return nil } func (c *client) setup() error { From 09e65317980290dbf7b588161285c5ddd3394c28 Mon Sep 17 00:00:00 2001 From: Bin Shi <39923490+binshi-bing@users.noreply.github.com> Date: Fri, 5 May 2023 17:07:56 +0800 Subject: [PATCH 8/8] Fix test issue in TestRandomResignLeader. (#6410) close tikv/pd#6404 We need to make sure the selected keyspaces are from different keyspace groups, otherwise multiple goroutines below could try to resign the primary of the same keyspace group and cause race condition. Signed-off-by: Bin Shi --- tests/integrations/tso/client_test.go | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 31258a23bd1..7034aa31547 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -53,6 +53,11 @@ type tsoClientTestSuite struct { // The TSO service in microservice mode. tsoCluster *mcs.TestTSOCluster + keyspaceGroups []struct { + keyspaceGroupID uint32 + keyspaceIDs []uint32 + } + backendEndpoints string keyspaceIDs []uint32 clients []pd.Client @@ -99,7 +104,7 @@ func (suite *tsoClientTestSuite) SetupSuite() { suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 3, suite.backendEndpoints) re.NoError(err) - params := []struct { + suite.keyspaceGroups = []struct { keyspaceGroupID uint32 keyspaceIDs []uint32 }{ @@ -108,7 +113,7 @@ func (suite *tsoClientTestSuite) SetupSuite() { {2, []uint32{2}}, } - for _, param := range params { + for _, param := range suite.keyspaceGroups { if param.keyspaceGroupID == 0 { // we have already created default keyspace group, so we can skip it. // keyspace 10 isn't assigned to any keyspace group, so they will be @@ -127,8 +132,8 @@ func (suite *tsoClientTestSuite) SetupSuite() { }) } - for _, param := range params { - suite.keyspaceIDs = append(suite.keyspaceIDs, param.keyspaceIDs...) + for _, keyspaceGroup := range suite.keyspaceGroups { + suite.keyspaceIDs = append(suite.keyspaceIDs, keyspaceGroup.keyspaceIDs...) } suite.clients = mcs.WaitForMultiKeyspacesTSOAvailable( @@ -242,10 +247,15 @@ func (suite *tsoClientTestSuite) TestRandomResignLeader() { time.Sleep(time.Duration(n) * time.Second) if !suite.legacy { wg := sync.WaitGroup{} - // Select the default keyspace and a randomly picked keyspace to test - keyspaceIDs := []uint32{mcsutils.DefaultKeyspaceID} - selectIdx := uint32(r.Intn(len(suite.keyspaceIDs)-1) + 1) - keyspaceIDs = append(keyspaceIDs, suite.keyspaceIDs[selectIdx]) + // Select the first keyspace from all keyspace groups. We need to make sure the selected + // keyspaces are from different keyspace groups, otherwise multiple goroutines below could + // try to resign the primary of the same keyspace group and cause race condition. + keyspaceIDs := make([]uint32, 0) + for _, keyspaceGroup := range suite.keyspaceGroups { + if len(keyspaceGroup.keyspaceIDs) > 0 { + keyspaceIDs = append(keyspaceIDs, keyspaceGroup.keyspaceIDs[0]) + } + } wg.Add(len(keyspaceIDs)) for _, keyspaceID := range keyspaceIDs { go func(keyspaceID uint32) {