Skip to content

Commit

Permalink
tests: make the TSO testing as independent as possible (#6183)
Browse files Browse the repository at this point in the history
ref #6181

- Support testing both legacy and microservice TSO.
- Make the TSO testing as independent as possible.

Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato authored Mar 21, 2023
1 parent 832047e commit 2b0cbae
Show file tree
Hide file tree
Showing 16 changed files with 1,632 additions and 378 deletions.
12 changes: 3 additions & 9 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,6 @@ type Client interface {
// GetLeaderAddr returns current leader's address. It returns "" before
// syncing leader from server.
GetLeaderAddr() string
// GetTS gets a timestamp from PD.
GetTS(ctx context.Context) (int64, int64, error)
// GetTSAsync gets a timestamp from PD, without block the caller.
GetTSAsync(ctx context.Context) TSFuture
// GetLocalTS gets a local timestamp from PD.
GetLocalTS(ctx context.Context, dcLocation string) (int64, int64, error)
// GetLocalTSAsync gets a local timestamp from PD, without block the caller.
GetLocalTSAsync(ctx context.Context, dcLocation string) TSFuture
// GetRegion gets a region and its leader Peer from PD by key.
// The region may expire after split. Caller is responsible for caching and
// taking care of region change.
Expand Down Expand Up @@ -144,7 +136,9 @@ type Client interface {
// SetExternalTimestamp sets external timestamp
SetExternalTimestamp(ctx context.Context, timestamp uint64) error

// MetaStorageClient returns the meta storage client.
// TSOClient is the TSO client.
TSOClient
// MetaStorageClient is the meta storage client.
MetaStorageClient
// KeyspaceClient manages keyspace metadata.
KeyspaceClient
Expand Down
13 changes: 9 additions & 4 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,16 @@ import (
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)

// TSOClient defines basic interface of the TSO client
// For test only
// TSOClient is the client used to get timestamps.
type TSOClient interface {
// GetTSOAllocators returns {dc-location -> TSO allocator serving URL} connection map
GetTSOAllocators() *sync.Map
// GetTS gets a timestamp from PD.
GetTS(ctx context.Context) (int64, int64, error)
// GetTSAsync gets a timestamp from PD, without block the caller.
GetTSAsync(ctx context.Context) TSFuture
// GetLocalTS gets a local timestamp from PD.
GetLocalTS(ctx context.Context, dcLocation string) (int64, int64, error)
// GetLocalTSAsync gets a local timestamp from PD, without block the caller.
GetLocalTSAsync(ctx context.Context, dcLocation string) TSFuture
}

type tsoRequest struct {
Expand Down
4 changes: 4 additions & 0 deletions pd.code-workspace
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
"name": "mcs-tests",
"path": "tests/mcs"
},
{
"name": "tso-tests",
"path": "tests/tso"
},
{
"name": "pd-tso-bench",
"path": "tools/pd-tso-bench"
Expand Down
10 changes: 10 additions & 0 deletions pkg/mcs/tso/server/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ package server
import (
"context"
"os"
"strings"

"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
"github.com/spf13/cobra"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/testutil"
"google.golang.org/grpc"
)

// NewTSOTestServer creates a tso server for testing.
Expand Down Expand Up @@ -57,3 +60,10 @@ func NewTSOTestDefaultConfig() (*Config, error) {
flagSet := cmd.Flags()
return cfg, cfg.Parse(flagSet)
}

// MustNewGrpcClient must create a new TSO grpc client.
func MustNewGrpcClient(re *require.Assertions, addr string) (*grpc.ClientConn, tsopb.TSOClient) {
conn, err := grpc.Dial(strings.TrimPrefix(addr, "http://"), grpc.WithInsecure())
re.NoError(err)
return conn, tsopb.NewTSOClient(conn)
}
3 changes: 1 addition & 2 deletions pkg/utils/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,9 @@ func NewRequestHeader(clusterID uint64) *pdpb.RequestHeader {
}
}

// MustNewGrpcClient must create a new grpc client.
// MustNewGrpcClient must create a new PD grpc client.
func MustNewGrpcClient(re *require.Assertions, addr string) pdpb.PDClient {
conn, err := grpc.Dial(strings.TrimPrefix(addr, "http://"), grpc.WithInsecure())

re.NoError(err)
return pdpb.NewPDClient(conn)
}
Expand Down
91 changes: 6 additions & 85 deletions tests/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,47 +207,6 @@ func TestLeaderTransfer(t *testing.T) {
wg.Wait()
}

// More details can be found in this issue: https://github.com/tikv/pd/issues/4884
func TestUpdateAfterResetTSO(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 2)
re.NoError(err)
defer cluster.Destroy()

endpoints := runServer(re, cluster)
cli := setupCli(re, ctx, endpoints)

testutil.Eventually(re, func() bool {
_, _, err := cli.GetTS(context.TODO())
return err == nil
})
// Transfer leader to trigger the TSO resetting.
re.NoError(failpoint.Enable("github.com/tikv/pd/server/updateAfterResetTSO", "return(true)"))
oldLeaderName := cluster.WaitLeader()
err = cluster.GetServer(oldLeaderName).ResignLeader()
re.NoError(err)
re.NoError(failpoint.Disable("github.com/tikv/pd/server/updateAfterResetTSO"))
newLeaderName := cluster.WaitLeader()
re.NotEqual(oldLeaderName, newLeaderName)
// Request a new TSO.
testutil.Eventually(re, func() bool {
_, _, err := cli.GetTS(context.TODO())
return err == nil
})
// Transfer leader back.
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/delaySyncTimestamp", `return(true)`))
err = cluster.GetServer(newLeaderName).ResignLeader()
re.NoError(err)
// Should NOT panic here.
testutil.Eventually(re, func() bool {
_, _, err := cli.GetTS(context.TODO())
return err == nil
})
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/delaySyncTimestamp"))
}

func TestTSOAllocatorLeader(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -293,7 +252,7 @@ func TestTSOAllocatorLeader(t *testing.T) {

// Check allocator leaders URL map.
cli.Close()
for dcLocation, url := range getTSOAllocatorServingEndpointURLs(cli.(pd.TSOClient)) {
for dcLocation, url := range getTSOAllocatorServingEndpointURLs(cli.(TSOAllocatorsGetter)) {
if dcLocation == tso.GlobalDCLocation {
urls := innerCli.GetServiceDiscovery().GetURLs()
sort.Strings(urls)
Expand Down Expand Up @@ -411,6 +370,7 @@ func TestUnavailableTimeAfterLeaderIsReady(t *testing.T) {
re.Less(maxUnavailableTime.UnixMilli(), leaderReadyTime.Add(1*time.Second).UnixMilli())
}

// TODO: migrate the Local/Global TSO tests to TSO integration test folder.
func TestGlobalAndLocalTSO(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -512,7 +472,10 @@ func requestGlobalAndLocalTSO(
wg.Wait()
}

func getTSOAllocatorServingEndpointURLs(c pd.TSOClient) map[string]string {
// GetTSOAllocators defines the TSO allocators getter.
type TSOAllocatorsGetter interface{ GetTSOAllocators() *sync.Map }

func getTSOAllocatorServingEndpointURLs(c TSOAllocatorsGetter) map[string]string {
allocatorLeaders := make(map[string]string)
c.GetTSOAllocators().Range(func(dcLocation, url interface{}) bool {
allocatorLeaders[dcLocation.(string)] = url.(string)
Expand Down Expand Up @@ -859,48 +822,6 @@ func (suite *clientTestSuite) bootstrapServer(header *pdpb.RequestHeader, client
suite.Equal(pdpb.ErrorType_OK, resp.GetHeader().GetError().GetType())
}

func (suite *clientTestSuite) TestNormalTSO() {
var wg sync.WaitGroup
wg.Add(tsoRequestConcurrencyNumber)
for i := 0; i < tsoRequestConcurrencyNumber; i++ {
go func() {
defer wg.Done()
var lastTS uint64
for i := 0; i < tsoRequestRound; i++ {
physical, logical, err := suite.client.GetTS(context.Background())
suite.NoError(err)
ts := tsoutil.ComposeTS(physical, logical)
suite.Less(lastTS, ts)
lastTS = ts
}
}()
}
wg.Wait()
}

func (suite *clientTestSuite) TestGetTSAsync() {
var wg sync.WaitGroup
wg.Add(tsoRequestConcurrencyNumber)
for i := 0; i < tsoRequestConcurrencyNumber; i++ {
go func() {
defer wg.Done()
tsFutures := make([]pd.TSFuture, tsoRequestRound)
for i := range tsFutures {
tsFutures[i] = suite.client.GetTSAsync(context.Background())
}
var lastTS uint64 = math.MaxUint64
for i := len(tsFutures) - 1; i >= 0; i-- {
physical, logical, err := tsFutures[i].Wait()
suite.NoError(err)
ts := tsoutil.ComposeTS(physical, logical)
suite.Greater(lastTS, ts)
lastTS = ts
}
}()
}
wg.Wait()
}

func (suite *clientTestSuite) TestGetRegion() {
regionID := regionIDAllocator.alloc()
region := &metapb.Region{
Expand Down
102 changes: 0 additions & 102 deletions tests/mcs/tso/tso_service_test.go

This file was deleted.

Loading

0 comments on commit 2b0cbae

Please sign in to comment.