diff --git a/store/tikv/async_commit_test.go b/store/tikv/async_commit_test.go index 705b97de00e76..af8efaa74a656 100644 --- a/store/tikv/async_commit_test.go +++ b/store/tikv/async_commit_test.go @@ -411,6 +411,98 @@ func (s *testAsyncCommitSuite) TestAsyncCommitWithMultiDC(c *C) { c.Assert(globalTxn.committer.isAsyncCommit(), IsTrue) } +func (s *testAsyncCommitSuite) TestResolveTxnFallbackFromAsyncCommit(c *C) { + keys := [][]byte{[]byte("k0"), []byte("k1")} + values := [][]byte{[]byte("v00"), []byte("v10")} + initTest := func() *twoPhaseCommitter { + t0 := s.begin(c) + err := t0.Set(keys[0], values[0]) + c.Assert(err, IsNil) + err = t0.Set(keys[1], values[1]) + c.Assert(err, IsNil) + err = t0.Commit(context.Background()) + c.Assert(err, IsNil) + + t1 := s.beginAsyncCommit(c) + err = t1.Set(keys[0], []byte("v01")) + c.Assert(err, IsNil) + err = t1.Set(keys[1], []byte("v11")) + c.Assert(err, IsNil) + + committer, err := newTwoPhaseCommitterWithInit(t1, 1) + c.Assert(err, IsNil) + atomic.StoreUint64(&committer.lockTTL, 1) + committer.setAsyncCommit(true) + return committer + } + prewriteKey := func(committer *twoPhaseCommitter, idx int, fallback bool) { + bo := NewBackofferWithVars(context.Background(), 5000, nil) + loc, err := s.store.GetRegionCache().LocateKey(bo, keys[idx]) + c.Assert(err, IsNil) + var batch batchMutations + batch.mutations = committer.mutations.Slice(idx, idx+1) + batch.region = RegionVerID{loc.Region.GetID(), loc.Region.GetConfVer(), loc.Region.GetVer()} + batch.isPrimary = bytes.Equal(keys[idx], committer.primary()) + req := committer.buildPrewriteRequest(batch, 1) + if fallback { + req.Req.(*kvrpcpb.PrewriteRequest).MaxCommitTs = 1 + } + resp, err := s.store.SendReq(bo, req, loc.Region, 5000) + c.Assert(err, IsNil) + c.Assert(resp.Resp, NotNil) + } + readKey := func(idx int) { + t2 := s.begin(c) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + val, err := t2.Get(ctx, keys[idx]) + c.Assert(err, IsNil) + c.Assert(val, DeepEquals, values[idx]) + } + + // Case 1: Fallback primary, read primary + committer := initTest() + prewriteKey(committer, 0, true) + prewriteKey(committer, 1, false) + readKey(0) + readKey(1) + + // Case 2: Fallback primary, read secondary + committer = initTest() + prewriteKey(committer, 0, true) + prewriteKey(committer, 1, false) + readKey(1) + readKey(0) + + // Case 3: Fallback secondary, read primary + committer = initTest() + prewriteKey(committer, 0, false) + prewriteKey(committer, 1, true) + readKey(0) + readKey(1) + + // Case 4: Fallback secondary, read secondary + committer = initTest() + prewriteKey(committer, 0, false) + prewriteKey(committer, 1, true) + readKey(1) + readKey(0) + + // Case 5: Fallback both, read primary + committer = initTest() + prewriteKey(committer, 0, true) + prewriteKey(committer, 1, true) + readKey(0) + readKey(1) + + // Case 6: Fallback both, read secondary + committer = initTest() + prewriteKey(committer, 0, true) + prewriteKey(committer, 1, true) + readKey(1) + readKey(0) +} + type mockResolveClient struct { inner Client onResolveLock func(*kvrpcpb.ResolveLockRequest) (*tikvrpc.Response, error) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 27c4ac32bdd55..7cab28c5c463e 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -484,12 +484,7 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart var currentTS uint64 var err error var status TxnStatus - if l.UseAsyncCommit && !forceSyncCommit { - // Async commit doesn't need the current ts since it uses the minCommitTS. - currentTS = 0 - // Set to 0 so as not to push forward min commit ts. - callerStartTS = 0 - } else if l.TTL == 0 { + if l.TTL == 0 { // NOTE: l.TTL = 0 is a special protocol!!! // When the pessimistic txn prewrite meets locks of a txn, it should resolve the lock **unconditionally**. // In this case, TiKV use lock TTL = 0 to notify TiDB, and TiDB should resolve the lock!