Skip to content

Commit

Permalink
client, tests: allow TSO fallback happens in TestMixedTSODeployment (t…
Browse files Browse the repository at this point in the history
…ikv#6740)

close tikv#6634

Introduce `WithAllowTSOFallback` client option to bypass the panic in `TestMixedTSODeployment`.

Signed-off-by: JmPotato <ghzpotato@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and rleungx committed Aug 2, 2023
1 parent 00ccc5a commit 80dfa01
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 6 deletions.
22 changes: 22 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,28 @@ func WithMaxErrorRetry(count int) ClientOption {
}
}

// WithMetricsLabels configures the client with metrics labels.
func WithMetricsLabels(labels prometheus.Labels) ClientOption {
return func(c *client) {
c.option.metricsLabels = labels
}
}

// WithInitMetricsOption configures the client with metrics labels.
func WithInitMetricsOption(initMetrics bool) ClientOption {
return func(c *client) {
c.option.initMetrics = initMetrics
}
}

// WithAllowTSOFallback configures the client with `allowTSOFallback` option.
// NOTICE: This should only be used for testing.
func WithAllowTSOFallback() ClientOption {
return func(c *client) {
c.option.allowTSOFallback = true
}
}

var _ Client = (*client)(nil)

// serviceModeKeeper is for service mode switching.
Expand Down
4 changes: 4 additions & 0 deletions client/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -51,6 +52,9 @@ type option struct {
timeout time.Duration
maxRetryTimes int
enableForwarding bool
metricsLabels prometheus.Labels
initMetrics bool
allowTSOFallback bool

// Dynamic options.
dynamicOptions [dynamicOptionCount]atomic.Value
Expand Down
20 changes: 17 additions & 3 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,22 @@ func (c *tsoClient) compareAndSwapTS(
// all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned
// last time.
if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) {
log.Panic("[tso] timestamp fallback",
if !c.option.allowTSOFallback {
log.Panic("[tso] timestamp fallback",
zap.String("dc-location", dcLocation),
zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()),
zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)),
zap.String("cur-ts", fmt.Sprintf("(%d, %d)", physical, firstLogical)),
zap.String("last-tso-server", lastTSOInfo.tsoServer),
zap.String("cur-tso-server", curTSOInfo.tsoServer),
zap.Uint32("last-keyspace-group-in-request", lastTSOInfo.reqKeyspaceGroupID),
zap.Uint32("cur-keyspace-group-in-request", curTSOInfo.reqKeyspaceGroupID),
zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID),
zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID),
zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt),
zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt))
}
log.Error("[tso] timestamp fallback",
zap.String("dc-location", dcLocation),
zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()),
zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)),
Expand All @@ -799,8 +814,7 @@ func (c *tsoClient) compareAndSwapTS(
zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID),
zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID),
zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt),
zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt),
)
zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt))
}
lastTSOInfo.tsoServer = curTSOInfo.tsoServer
lastTSOInfo.reqKeyspaceGroupID = curTSOInfo.reqKeyspaceGroupID
Expand Down
9 changes: 6 additions & 3 deletions tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ func TestMixedTSODeployment(t *testing.T) {

ctx1, cancel1 := context.WithCancel(context.Background())
var wg sync.WaitGroup
checkTSO(ctx1, re, &wg, backendEndpoints)
checkTSO(ctx1, re, &wg, backendEndpoints, pd.WithAllowTSOFallback() /* It's expected that the timestamp fallback happens here */)
wg.Add(1)
go func() {
defer wg.Done()
Expand Down Expand Up @@ -497,12 +497,15 @@ func TestUpgradingAPIandTSOClusters(t *testing.T) {
re.NoError(failpoint.Disable("github.com/tikv/pd/client/usePDServiceMode"))
}

func checkTSO(ctx context.Context, re *require.Assertions, wg *sync.WaitGroup, backendEndpoints string) {
func checkTSO(
ctx context.Context, re *require.Assertions, wg *sync.WaitGroup,
backendEndpoints string, opts ...pd.ClientOption,
) {
wg.Add(tsoRequestConcurrencyNumber)
for i := 0; i < tsoRequestConcurrencyNumber; i++ {
go func() {
defer wg.Done()
cli := mcs.SetupClientWithAPIContext(ctx, re, pd.NewAPIContextV1(), strings.Split(backendEndpoints, ","))
cli := mcs.SetupClientWithAPIContext(ctx, re, pd.NewAPIContextV1(), strings.Split(backendEndpoints, ","), opts...)
defer cli.Close()
var ts, lastTS uint64
for {
Expand Down

0 comments on commit 80dfa01

Please sign in to comment.