From e0e4d6205de8d7580d9cf86999068a2c91e1072c Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Mon, 31 May 2021 20:47:36 +0800 Subject: [PATCH] store/tikv: retry TSO RPC (#24682) (#24733) --- store/tikv/2pc.go | 14 +++++--------- store/tikv/lock_resolver.go | 9 ++------- 2 files changed, 7 insertions(+), 16 deletions(-) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index b1ceb069639c9..5d2eee2e9519d 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -731,15 +731,11 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) { return } bo := NewBackofferWithVars(context.Background(), pessimisticLockMaxBackoff, c.txn.vars) - now, err := c.store.GetOracle().GetTimestamp(bo.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + now, err := c.store.getTimestampWithRetry(bo, oracle.GlobalTxnScope) if err != nil { - err1 := bo.Backoff(BoPDRPC, err) - if err1 != nil { - logutil.Logger(bo.ctx).Warn("keepAlive get tso fail", - zap.Error(err)) - return - } - continue + logutil.Logger(bo.GetCtx()).Warn("keepAlive get tso fail", + zap.Error(err)) + return } uptime := uint64(oracle.ExtractPhysical(now) - oracle.ExtractPhysical(c.startTS)) @@ -993,7 +989,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { // from PD as our MinCommitTS. if commitTSMayBeCalculated && c.needLinearizability() { failpoint.Inject("getMinCommitTSFromTSO", nil) - minCommitTS, err := c.store.oracle.GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + minCommitTS, err := c.store.getTimestampWithRetry(NewBackofferWithVars(ctx, tsoMaxBackoff, c.txn.vars), c.txn.GetUnionStore().GetOption(kv.TxnScope).(string)) // If we fail to get a timestamp from PD, we just propagate the failure // instead of falling back to the normal 2PC because a normal 2PC will // also be likely to fail due to the same timestamp issue. diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 7cab28c5c463e..9fb2b2fa3aaa3 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -227,11 +227,6 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi // locks have been cleaned before GC. expiredLocks := locks - callerStartTS, err := lr.store.GetOracle().GetTimestamp(bo.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) - if err != nil { - return false, errors.Trace(err) - } - txnInfos := make(map[uint64]uint64) startTime := time.Now() for _, l := range expiredLocks { @@ -241,7 +236,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi metrics.LockResolverCountWithExpired.Inc() // Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not! - status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true, false, l) + status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, false, l) if err != nil { return false, err } @@ -255,7 +250,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi continue } if _, ok := errors.Cause(err).(*nonAsyncCommitLock); ok { - status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true, true, l) + status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, true, l) if err != nil { return false, err }