Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: ignore all locks except the first met lock in autocommit get #24084

Merged
merged 2 commits into from
Apr 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 12 additions & 26 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/store/tikv/unionstore"
"github.com/pingcap/tidb/store/tikv/util"
Expand Down Expand Up @@ -414,14 +413,6 @@ func (s *KVSnapshot) get(ctx context.Context, bo *Backoffer, k []byte) ([]byte,

cli := NewClientHelper(s.store, s.resolvedLocks)

// Secondary locks or async commit locks cannot be ignored when getting using the max version.
// So we concurrently get a TS from PD and use it in retries to avoid unnecessary blocking.
var tsFuture oracle.Future
if s.version == maxTimestamp {
tsFuture = s.store.oracle.GetTimestampAsync(ctx, &oracle.Option{TxnScope: s.txnScope})
}
failpoint.Inject("snapshotGetTSAsync", nil)

isStaleness := false
var matchStoreLabels []*metapb.StoreLabel
s.mu.RLock()
Expand Down Expand Up @@ -450,7 +441,10 @@ func (s *KVSnapshot) get(ctx context.Context, bo *Backoffer, k []byte) ([]byte,
if len(matchStoreLabels) > 0 {
ops = append(ops, WithMatchLabels(matchStoreLabels))
}

var firstLock *Lock
for {
failpoint.Inject("beforeSendPointGet", nil)
loc, err := s.store.regionCache.LocateKey(bo, k)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -483,25 +477,17 @@ func (s *KVSnapshot) get(ctx context.Context, bo *Backoffer, k []byte) ([]byte,
if err != nil {
return nil, errors.Trace(err)
}

snapVer := s.version
if s.version == maxTimestamp {
newTS, err := tsFuture.Wait()
if err != nil {
return nil, errors.Trace(err)
}
s.version = newTS
req.Req.(*pb.GetRequest).Version = newTS
// skip lock resolving and backoff if the lock does not block the read
if newTS < lock.TxnID || newTS < lock.MinCommitTS {
continue
}
if firstLock == nil {
firstLock = lock
} else if s.version == maxTimestamp && firstLock.TxnID != lock.TxnID {
// If it is an autocommit point get, it needs to be blocked only
// by the first lock it meets. During retries, if the encountered
// lock is different from the first one, we can omit it.
cli.resolvedLocks.Put(lock.TxnID)
sticnarf marked this conversation as resolved.
Show resolved Hide resolved
continue
}

// Use the original snapshot version to resolve locks so we can use MaxUint64
// as the callerStartTS if it's an auto-commit point get. This could save us
// one write at TiKV by not pushing forward the minCommitTS.
msBeforeExpired, err := cli.ResolveLocks(bo, snapVer, []*Lock{lock})
msBeforeExpired, err := cli.ResolveLocks(bo, s.version, []*Lock{lock})
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
65 changes: 46 additions & 19 deletions store/tikv/tests/snapshot_fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,38 +140,65 @@ func (s *testSnapshotFailSuite) TestScanResponseKeyError(c *C) {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/rpcScanResult"), IsNil)
}

func (s *testSnapshotFailSuite) TestRetryPointGetWithTS(c *C) {
func (s *testSnapshotFailSuite) TestRetryMaxTsPointGetSkipLock(c *C) {
defer s.cleanup(c)

// Prewrite k1 and k2 with async commit but don't commit them
txn, err := s.store.Begin()
c.Assert(err, IsNil)
err = txn.Set([]byte("k1"), []byte("v1"))
c.Assert(err, IsNil)
err = txn.Set([]byte("k2"), []byte("v2"))
c.Assert(err, IsNil)
txn.SetOption(kv.EnableAsyncCommit, true)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing", "return"), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/twoPCShortLockTTL", "return"), IsNil)
committer, err := txn.NewCommitter(1)
c.Assert(err, IsNil)
err = committer.Execute(context.Background())
c.Assert(err, IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/twoPCShortLockTTL"), IsNil)

snapshot := s.store.GetSnapshot(math.MaxUint64)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/snapshotGetTSAsync", `pause`), IsNil)
ch := make(chan error)
getCh := make(chan []byte)
go func() {
_, err := snapshot.Get(context.Background(), []byte("k4"))
ch <- err
// Sleep a while to make the TTL of the first txn expire, then we make sure we resolve lock by this get
time.Sleep(200 * time.Millisecond)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforeSendPointGet", "1*off->pause"), IsNil)
res, err := snapshot.Get(context.Background(), []byte("k2"))
c.Assert(err, IsNil)
getCh <- res
}()
// The get should be blocked by the failpoint. But the lock should have been resolved.
select {
case res := <-getCh:
c.Errorf("too early %s", string(res))
case <-time.After(1 * time.Second):
}

txn, err := s.store.Begin()
c.Assert(err, IsNil)
err = txn.Set([]byte("k4"), []byte("v4"))
// Prewrite k1 and k2 again without committing them
txn, err = s.store.Begin()
c.Assert(err, IsNil)
txn.SetOption(kv.EnableAsyncCommit, true)
txn.SetOption(kv.Enable1PC, false)
txn.SetOption(kv.GuaranteeLinearizability, false)
// Prewrite an async-commit lock and do not commit it.
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing", `return`), IsNil)
committer, err := txn.NewCommitter(1)
err = txn.Set([]byte("k1"), []byte("v3"))
c.Assert(err, IsNil)
err = txn.Set([]byte("k2"), []byte("v4"))
c.Assert(err, IsNil)
committer, err = txn.NewCommitter(1)
c.Assert(err, IsNil)
// Sets its minCommitTS to one second later, so the lock will be ignored by point get.
committer.SetMinCommitTS(committer.GetStartTS() + (1000 << 18))
err = committer.Execute(context.Background())
c.Assert(err, IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/snapshotGetTSAsync"), IsNil)

err = <-ch
c.Assert(err, ErrorMatches, ".*key not exist")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforeSendPointGet"), IsNil)

c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing"), IsNil)
// After disabling the failpoint, the get request should bypass the new locks and read the old result
select {
case res := <-getCh:
c.Assert(res, DeepEquals, []byte("v2"))
case <-time.After(1 * time.Second):
c.Errorf("get timeout")
}
}

func (s *testSnapshotFailSuite) TestRetryPointGetResolveTS(c *C) {
Expand Down