Skip to content

Commit

Permalink
store/tikv: retry TSO RPC (#24682)
Browse files Browse the repository at this point in the history
  • Loading branch information
youjiali1995 authored May 18, 2021
1 parent 44830b9 commit 3ec8c8b
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 16 deletions.
14 changes: 5 additions & 9 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,15 +739,11 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) {
return
}
bo := retry.NewBackofferWithVars(context.Background(), pessimisticLockMaxBackoff, c.txn.vars)
now, err := c.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
now, err := c.store.getTimestampWithRetry(bo, c.txn.GetScope())
if err != nil {
err1 := bo.Backoff(retry.BoPDRPC, err)
if err1 != nil {
logutil.Logger(bo.GetCtx()).Warn("keepAlive get tso fail",
zap.Error(err))
return
}
continue
logutil.Logger(bo.GetCtx()).Warn("keepAlive get tso fail",
zap.Error(err))
return
}

uptime := uint64(oracle.ExtractPhysical(now) - oracle.ExtractPhysical(c.startTS))
Expand Down Expand Up @@ -999,7 +995,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
// from PD and plus one as our MinCommitTS.
if commitTSMayBeCalculated && c.needLinearizability() {
failpoint.Inject("getMinCommitTSFromTSO", nil)
latestTS, err := c.store.oracle.GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
latestTS, err := c.store.getTimestampWithRetry(retry.NewBackofferWithVars(ctx, tsoMaxBackoff, c.txn.vars), c.txn.GetScope())
// If we fail to get a timestamp from PD, we just propagate the failure
// instead of falling back to the normal 2PC because a normal 2PC will
// also be likely to fail due to the same timestamp issue.
Expand Down
9 changes: 2 additions & 7 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,6 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
// locks have been cleaned before GC.
expiredLocks := locks

callerStartTS, err := lr.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
if err != nil {
return false, errors.Trace(err)
}

txnInfos := make(map[uint64]uint64)
startTime := time.Now()
for _, l := range expiredLocks {
Expand All @@ -243,7 +238,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
metrics.LockResolverCountWithExpired.Inc()

// Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not!
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true, false, l)
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, false, l)
if err != nil {
return false, err
}
Expand All @@ -257,7 +252,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
continue
}
if _, ok := errors.Cause(err).(*nonAsyncCommitLock); ok {
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true, true, l)
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, true, l)
if err != nil {
return false, err
}
Expand Down

0 comments on commit 3ec8c8b

Please sign in to comment.