Skip to content

Commit

Permalink
cherry pick pingcap#22723 to release-5.0
Browse files Browse the repository at this point in the history
Signed-off-by: Yilin Chen <sticnarf@gmail.com>
  • Loading branch information
sticnarf committed Apr 30, 2021
1 parent 1145e34 commit dbce38f
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 6 deletions.
92 changes: 92 additions & 0 deletions store/tikv/async_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,98 @@ func (s *testAsyncCommitSuite) TestAsyncCommitWithMultiDC(c *C) {
c.Assert(globalTxn.committer.isAsyncCommit(), IsTrue)
}

func (s *testAsyncCommitSuite) TestResolveTxnFallbackFromAsyncCommit(c *C) {
keys := [][]byte{[]byte("k0"), []byte("k1")}
values := [][]byte{[]byte("v00"), []byte("v10")}
initTest := func() *twoPhaseCommitter {
t0 := s.begin(c)
err := t0.Set(keys[0], values[0])
c.Assert(err, IsNil)
err = t0.Set(keys[1], values[1])
c.Assert(err, IsNil)
err = t0.Commit(context.Background())
c.Assert(err, IsNil)

t1 := s.beginAsyncCommit(c)
err = t1.Set(keys[0], []byte("v01"))
c.Assert(err, IsNil)
err = t1.Set(keys[1], []byte("v11"))
c.Assert(err, IsNil)

committer, err := newTwoPhaseCommitterWithInit(t1, 1)
c.Assert(err, IsNil)
atomic.StoreUint64(&committer.lockTTL, 1)
committer.setAsyncCommit(true)
return committer
}
prewriteKey := func(committer *twoPhaseCommitter, idx int, fallback bool) {
bo := NewBackofferWithVars(context.Background(), 5000, nil)
loc, err := s.store.GetRegionCache().LocateKey(bo, keys[idx])
c.Assert(err, IsNil)
var batch batchMutations
batch.mutations = committer.mutations.Slice(idx, idx+1)
batch.region = RegionVerID{loc.Region.GetID(), loc.Region.GetConfVer(), loc.Region.GetVer()}
batch.isPrimary = bytes.Equal(keys[idx], committer.primary())
req := committer.buildPrewriteRequest(batch, 1)
if fallback {
req.Req.(*kvrpcpb.PrewriteRequest).MaxCommitTs = 1
}
resp, err := s.store.SendReq(bo, req, loc.Region, 5000)
c.Assert(err, IsNil)
c.Assert(resp.Resp, NotNil)
}
readKey := func(idx int) {
t2 := s.begin(c)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
val, err := t2.Get(ctx, keys[idx])
c.Assert(err, IsNil)
c.Assert(val, DeepEquals, values[idx])
}

// Case 1: Fallback primary, read primary
committer := initTest()
prewriteKey(committer, 0, true)
prewriteKey(committer, 1, false)
readKey(0)
readKey(1)

// Case 2: Fallback primary, read secondary
committer = initTest()
prewriteKey(committer, 0, true)
prewriteKey(committer, 1, false)
readKey(1)
readKey(0)

// Case 3: Fallback secondary, read primary
committer = initTest()
prewriteKey(committer, 0, false)
prewriteKey(committer, 1, true)
readKey(0)
readKey(1)

// Case 4: Fallback secondary, read secondary
committer = initTest()
prewriteKey(committer, 0, false)
prewriteKey(committer, 1, true)
readKey(1)
readKey(0)

// Case 5: Fallback both, read primary
committer = initTest()
prewriteKey(committer, 0, true)
prewriteKey(committer, 1, true)
readKey(0)
readKey(1)

// Case 6: Fallback both, read secondary
committer = initTest()
prewriteKey(committer, 0, true)
prewriteKey(committer, 1, true)
readKey(1)
readKey(0)
}

type mockResolveClient struct {
inner Client
onResolveLock func(*kvrpcpb.ResolveLockRequest) (*tikvrpc.Response, error)
Expand Down
7 changes: 1 addition & 6 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,12 +484,7 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart
var currentTS uint64
var err error
var status TxnStatus
if l.UseAsyncCommit && !forceSyncCommit {
// Async commit doesn't need the current ts since it uses the minCommitTS.
currentTS = 0
// Set to 0 so as not to push forward min commit ts.
callerStartTS = 0
} else if l.TTL == 0 {
if l.TTL == 0 {
// NOTE: l.TTL = 0 is a special protocol!!!
// When the pessimistic txn prewrite meets locks of a txn, it should resolve the lock **unconditionally**.
// In this case, TiKV use lock TTL = 0 to notify TiDB, and TiDB should resolve the lock!
Expand Down

0 comments on commit dbce38f

Please sign in to comment.