Skip to content

Commit

Permalink
store/tikv: not setting special ts when resolving async locks (#22723)
Browse files Browse the repository at this point in the history
  • Loading branch information
sticnarf authored Apr 30, 2021
1 parent 1342f3a commit b77338d
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 7 deletions.
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,6 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/shirou/gopsutil v2.19.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil v3.21.2+incompatible h1:U+YvJfjCh6MslYlIAXvPtzhW3YZEtc9uncueUNpD/0A=
Expand Down
8 changes: 2 additions & 6 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,12 +485,8 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart
var currentTS uint64
var err error
var status TxnStatus
if l.UseAsyncCommit && !forceSyncCommit {
// Async commit doesn't need the current ts since it uses the minCommitTS.
currentTS = 0
// Set to 0 so as not to push forward min commit ts.
callerStartTS = 0
} else if l.TTL == 0 {

if l.TTL == 0 {
// NOTE: l.TTL = 0 is a special protocol!!!
// When the pessimistic txn prewrite meets locks of a txn, it should resolve the lock **unconditionally**.
// In this case, TiKV use lock TTL = 0 to notify TiDB, and TiDB should resolve the lock!
Expand Down
7 changes: 7 additions & 0 deletions store/tikv/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package tikv

import (
"bytes"
"context"
"sync/atomic"
"time"
Expand Down Expand Up @@ -304,6 +305,12 @@ func (c CommitterProbe) BuildPrewriteRequest(regionID, regionConf, regionVersion
var batch batchMutations
batch.mutations = mutations
batch.region = RegionVerID{regionID, regionConf, regionVersion}
for _, key := range mutations.GetKeys() {
if bytes.Equal(key, c.primary()) {
batch.isPrimary = true
break
}
}
return c.buildPrewriteRequest(batch, txnSize)
}

Expand Down
89 changes: 89 additions & 0 deletions store/tikv/tests/async_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,95 @@ func (s *testAsyncCommitSuite) TestAsyncCommitWithMultiDC(c *C) {
c.Assert(globalTxn.IsAsyncCommit(), IsTrue)
}

func (s *testAsyncCommitSuite) TestResolveTxnFallbackFromAsyncCommit(c *C) {
keys := [][]byte{[]byte("k0"), []byte("k1")}
values := [][]byte{[]byte("v00"), []byte("v10")}
initTest := func() tikv.CommitterProbe {
t0 := s.begin(c)
err := t0.Set(keys[0], values[0])
c.Assert(err, IsNil)
err = t0.Set(keys[1], values[1])
c.Assert(err, IsNil)
err = t0.Commit(context.Background())
c.Assert(err, IsNil)

t1 := s.beginAsyncCommit(c)
err = t1.Set(keys[0], []byte("v01"))
c.Assert(err, IsNil)
err = t1.Set(keys[1], []byte("v11"))
c.Assert(err, IsNil)

committer, err := t1.NewCommitter(1)
c.Assert(err, IsNil)
committer.SetLockTTL(1)
committer.SetUseAsyncCommit()
return committer
}
prewriteKey := func(committer tikv.CommitterProbe, idx int, fallback bool) {
bo := tikv.NewBackofferWithVars(context.Background(), 5000, nil)
loc, err := s.store.GetRegionCache().LocateKey(bo, keys[idx])
c.Assert(err, IsNil)
req := committer.BuildPrewriteRequest(loc.Region.GetID(), loc.Region.GetConfVer(), loc.Region.GetVer(),
committer.GetMutations().Slice(idx, idx+1), 1)
if fallback {
req.Req.(*kvrpcpb.PrewriteRequest).MaxCommitTs = 1
}
resp, err := s.store.SendReq(bo, req, loc.Region, 5000)
c.Assert(err, IsNil)
c.Assert(resp.Resp, NotNil)
}
readKey := func(idx int) {
t2 := s.begin(c)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
val, err := t2.Get(ctx, keys[idx])
c.Assert(err, IsNil)
c.Assert(val, DeepEquals, values[idx])
}

// Case 1: Fallback primary, read primary
committer := initTest()
prewriteKey(committer, 0, true)
prewriteKey(committer, 1, false)
readKey(0)
readKey(1)

// Case 2: Fallback primary, read secondary
committer = initTest()
prewriteKey(committer, 0, true)
prewriteKey(committer, 1, false)
readKey(1)
readKey(0)

// Case 3: Fallback secondary, read primary
committer = initTest()
prewriteKey(committer, 0, false)
prewriteKey(committer, 1, true)
readKey(0)
readKey(1)

// Case 4: Fallback secondary, read secondary
committer = initTest()
prewriteKey(committer, 0, false)
prewriteKey(committer, 1, true)
readKey(1)
readKey(0)

// Case 5: Fallback both, read primary
committer = initTest()
prewriteKey(committer, 0, true)
prewriteKey(committer, 1, true)
readKey(0)
readKey(1)

// Case 6: Fallback both, read secondary
committer = initTest()
prewriteKey(committer, 0, true)
prewriteKey(committer, 1, true)
readKey(1)
readKey(0)
}

type mockResolveClient struct {
inner tikv.Client
onResolveLock func(*kvrpcpb.ResolveLockRequest) (*tikvrpc.Response, error)
Expand Down

0 comments on commit b77338d

Please sign in to comment.