From 80dfa016fee3eb194020992276d29b20e31edd0d Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 6 Jul 2023 17:14:15 +0800 Subject: [PATCH] client, tests: allow TSO fallback happens in TestMixedTSODeployment (#6740) close tikv/pd#6634 Introduce `WithAllowTSOFallback` client option to bypass the panic in `TestMixedTSODeployment`. Signed-off-by: JmPotato Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/client.go | 22 ++++++++++++++++++++++ client/option.go | 4 ++++ client/tso_dispatcher.go | 20 +++++++++++++++++--- tests/integrations/tso/client_test.go | 9 ++++++--- 4 files changed, 49 insertions(+), 6 deletions(-) diff --git a/client/client.go b/client/client.go index f6b12e25935..af890384026 100644 --- a/client/client.go +++ b/client/client.go @@ -249,6 +249,28 @@ func WithMaxErrorRetry(count int) ClientOption { } } +// WithMetricsLabels configures the client with metrics labels. +func WithMetricsLabels(labels prometheus.Labels) ClientOption { + return func(c *client) { + c.option.metricsLabels = labels + } +} + +// WithInitMetricsOption configures the client with metrics labels. +func WithInitMetricsOption(initMetrics bool) ClientOption { + return func(c *client) { + c.option.initMetrics = initMetrics + } +} + +// WithAllowTSOFallback configures the client with `allowTSOFallback` option. +// NOTICE: This should only be used for testing. +func WithAllowTSOFallback() ClientOption { + return func(c *client) { + c.option.allowTSOFallback = true + } +} + var _ Client = (*client)(nil) // serviceModeKeeper is for service mode switching. diff --git a/client/option.go b/client/option.go index e2fcfcbbef4..7f8592ad540 100644 --- a/client/option.go +++ b/client/option.go @@ -19,6 +19,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc" ) @@ -51,6 +52,9 @@ type option struct { timeout time.Duration maxRetryTimes int enableForwarding bool + metricsLabels prometheus.Labels + initMetrics bool + allowTSOFallback bool // Dynamic options. dynamicOptions [dynamicOptionCount]atomic.Value diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 9fd5c586bf9..c93c281ec04 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -787,7 +787,22 @@ func (c *tsoClient) compareAndSwapTS( // all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned // last time. if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) { - log.Panic("[tso] timestamp fallback", + if !c.option.allowTSOFallback { + log.Panic("[tso] timestamp fallback", + zap.String("dc-location", dcLocation), + zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()), + zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)), + zap.String("cur-ts", fmt.Sprintf("(%d, %d)", physical, firstLogical)), + zap.String("last-tso-server", lastTSOInfo.tsoServer), + zap.String("cur-tso-server", curTSOInfo.tsoServer), + zap.Uint32("last-keyspace-group-in-request", lastTSOInfo.reqKeyspaceGroupID), + zap.Uint32("cur-keyspace-group-in-request", curTSOInfo.reqKeyspaceGroupID), + zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID), + zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID), + zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt), + zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt)) + } + log.Error("[tso] timestamp fallback", zap.String("dc-location", dcLocation), zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()), zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)), @@ -799,8 +814,7 @@ func (c *tsoClient) compareAndSwapTS( zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID), zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID), zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt), - zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt), - ) + zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt)) } lastTSOInfo.tsoServer = curTSOInfo.tsoServer lastTSOInfo.reqKeyspaceGroupID = curTSOInfo.reqKeyspaceGroupID diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index a1c2ec08565..6727877a1c7 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -436,7 +436,7 @@ func TestMixedTSODeployment(t *testing.T) { ctx1, cancel1 := context.WithCancel(context.Background()) var wg sync.WaitGroup - checkTSO(ctx1, re, &wg, backendEndpoints) + checkTSO(ctx1, re, &wg, backendEndpoints, pd.WithAllowTSOFallback() /* It's expected that the timestamp fallback happens here */) wg.Add(1) go func() { defer wg.Done() @@ -497,12 +497,15 @@ func TestUpgradingAPIandTSOClusters(t *testing.T) { re.NoError(failpoint.Disable("github.com/tikv/pd/client/usePDServiceMode")) } -func checkTSO(ctx context.Context, re *require.Assertions, wg *sync.WaitGroup, backendEndpoints string) { +func checkTSO( + ctx context.Context, re *require.Assertions, wg *sync.WaitGroup, + backendEndpoints string, opts ...pd.ClientOption, +) { wg.Add(tsoRequestConcurrencyNumber) for i := 0; i < tsoRequestConcurrencyNumber; i++ { go func() { defer wg.Done() - cli := mcs.SetupClientWithAPIContext(ctx, re, pd.NewAPIContextV1(), strings.Split(backendEndpoints, ",")) + cli := mcs.SetupClientWithAPIContext(ctx, re, pd.NewAPIContextV1(), strings.Split(backendEndpoints, ","), opts...) defer cli.Close() var ts, lastTS uint64 for {