Skip to content

Commit

Permalink
store/tikv: ignore all locks except the first met lock in autocommit …
Browse files Browse the repository at this point in the history
…get (#24084)
  • Loading branch information
sticnarf authored Apr 21, 2021
1 parent ef42b8e commit 59b8da7
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 45 deletions.
38 changes: 12 additions & 26 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/store/tikv/unionstore"
"github.com/pingcap/tidb/store/tikv/util"
Expand Down Expand Up @@ -414,14 +413,6 @@ func (s *KVSnapshot) get(ctx context.Context, bo *Backoffer, k []byte) ([]byte,

cli := NewClientHelper(s.store, s.resolvedLocks)

// Secondary locks or async commit locks cannot be ignored when getting using the max version.
// So we concurrently get a TS from PD and use it in retries to avoid unnecessary blocking.
var tsFuture oracle.Future
if s.version == maxTimestamp {
tsFuture = s.store.oracle.GetTimestampAsync(ctx, &oracle.Option{TxnScope: s.txnScope})
}
failpoint.Inject("snapshotGetTSAsync", nil)

isStaleness := false
var matchStoreLabels []*metapb.StoreLabel
s.mu.RLock()
Expand Down Expand Up @@ -450,7 +441,10 @@ func (s *KVSnapshot) get(ctx context.Context, bo *Backoffer, k []byte) ([]byte,
if len(matchStoreLabels) > 0 {
ops = append(ops, WithMatchLabels(matchStoreLabels))
}

var firstLock *Lock
for {
failpoint.Inject("beforeSendPointGet", nil)
loc, err := s.store.regionCache.LocateKey(bo, k)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -483,25 +477,17 @@ func (s *KVSnapshot) get(ctx context.Context, bo *Backoffer, k []byte) ([]byte,
if err != nil {
return nil, errors.Trace(err)
}

snapVer := s.version
if s.version == maxTimestamp {
newTS, err := tsFuture.Wait()
if err != nil {
return nil, errors.Trace(err)
}
s.version = newTS
req.Req.(*pb.GetRequest).Version = newTS
// skip lock resolving and backoff if the lock does not block the read
if newTS < lock.TxnID || newTS < lock.MinCommitTS {
continue
}
if firstLock == nil {
firstLock = lock
} else if s.version == maxTimestamp && firstLock.TxnID != lock.TxnID {
// If it is an autocommit point get, it needs to be blocked only
// by the first lock it meets. During retries, if the encountered
// lock is different from the first one, we can omit it.
cli.resolvedLocks.Put(lock.TxnID)
continue
}

// Use the original snapshot version to resolve locks so we can use MaxUint64
// as the callerStartTS if it's an auto-commit point get. This could save us
// one write at TiKV by not pushing forward the minCommitTS.
msBeforeExpired, err := cli.ResolveLocks(bo, snapVer, []*Lock{lock})
msBeforeExpired, err := cli.ResolveLocks(bo, s.version, []*Lock{lock})
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
65 changes: 46 additions & 19 deletions store/tikv/tests/snapshot_fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,38 +140,65 @@ func (s *testSnapshotFailSuite) TestScanResponseKeyError(c *C) {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/rpcScanResult"), IsNil)
}

func (s *testSnapshotFailSuite) TestRetryPointGetWithTS(c *C) {
func (s *testSnapshotFailSuite) TestRetryMaxTsPointGetSkipLock(c *C) {
defer s.cleanup(c)

// Prewrite k1 and k2 with async commit but don't commit them
txn, err := s.store.Begin()
c.Assert(err, IsNil)
err = txn.Set([]byte("k1"), []byte("v1"))
c.Assert(err, IsNil)
err = txn.Set([]byte("k2"), []byte("v2"))
c.Assert(err, IsNil)
txn.SetOption(kv.EnableAsyncCommit, true)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing", "return"), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/twoPCShortLockTTL", "return"), IsNil)
committer, err := txn.NewCommitter(1)
c.Assert(err, IsNil)
err = committer.Execute(context.Background())
c.Assert(err, IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/twoPCShortLockTTL"), IsNil)

snapshot := s.store.GetSnapshot(math.MaxUint64)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/snapshotGetTSAsync", `pause`), IsNil)
ch := make(chan error)
getCh := make(chan []byte)
go func() {
_, err := snapshot.Get(context.Background(), []byte("k4"))
ch <- err
// Sleep a while to make the TTL of the first txn expire, then we make sure we resolve lock by this get
time.Sleep(200 * time.Millisecond)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforeSendPointGet", "1*off->pause"), IsNil)
res, err := snapshot.Get(context.Background(), []byte("k2"))
c.Assert(err, IsNil)
getCh <- res
}()
// The get should be blocked by the failpoint. But the lock should have been resolved.
select {
case res := <-getCh:
c.Errorf("too early %s", string(res))
case <-time.After(1 * time.Second):
}

txn, err := s.store.Begin()
c.Assert(err, IsNil)
err = txn.Set([]byte("k4"), []byte("v4"))
// Prewrite k1 and k2 again without committing them
txn, err = s.store.Begin()
c.Assert(err, IsNil)
txn.SetOption(kv.EnableAsyncCommit, true)
txn.SetOption(kv.Enable1PC, false)
txn.SetOption(kv.GuaranteeLinearizability, false)
// Prewrite an async-commit lock and do not commit it.
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing", `return`), IsNil)
committer, err := txn.NewCommitter(1)
err = txn.Set([]byte("k1"), []byte("v3"))
c.Assert(err, IsNil)
err = txn.Set([]byte("k2"), []byte("v4"))
c.Assert(err, IsNil)
committer, err = txn.NewCommitter(1)
c.Assert(err, IsNil)
// Sets its minCommitTS to one second later, so the lock will be ignored by point get.
committer.SetMinCommitTS(committer.GetStartTS() + (1000 << 18))
err = committer.Execute(context.Background())
c.Assert(err, IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/snapshotGetTSAsync"), IsNil)

err = <-ch
c.Assert(err, ErrorMatches, ".*key not exist")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforeSendPointGet"), IsNil)

c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing"), IsNil)
// After disabling the failpoint, the get request should bypass the new locks and read the old result
select {
case res := <-getCh:
c.Assert(res, DeepEquals, []byte("v2"))
case <-time.After(1 * time.Second):
c.Errorf("get timeout")
}
}

func (s *testSnapshotFailSuite) TestRetryPointGetResolveTS(c *C) {
Expand Down

0 comments on commit 59b8da7

Please sign in to comment.