Skip to content

Commit

Permalink
store/tikv: use original snapshot timestamp to resolve locks (#23044)
Browse files Browse the repository at this point in the history
  • Loading branch information
sticnarf authored Mar 8, 2021
1 parent 3b016a9 commit 9a75e50
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 2 deletions.
2 changes: 1 addition & 1 deletion store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1126,7 +1126,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
}
}
}
c.commitTS = commitTS
atomic.StoreUint64(&c.commitTS, commitTS)

if c.store.oracle.IsExpired(c.startTS, kv.MaxTxnTimeUse, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) {
err = errors.Errorf("session %d txn takes too much time, txnStartTS: %d, comm: %d",
Expand Down
6 changes: 5 additions & 1 deletion store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte
return nil, errors.Trace(err)
}

snapVer := s.version.Ver
if s.version == kv.MaxVersion {
newTS, err := tsFuture.Wait()
if err != nil {
Expand All @@ -494,7 +495,10 @@ func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte
}
}

msBeforeExpired, err := cli.ResolveLocks(bo, s.version.Ver, []*Lock{lock})
// Use the original snapshot version to resolve locks so we can use MaxUint64
// as the callerStartTS if it's an auto-commit point get. This could save us
// one write at TiKV by not pushing forward the minCommitTS.
msBeforeExpired, err := cli.ResolveLocks(bo, snapVer, []*Lock{lock})
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
45 changes: 45 additions & 0 deletions store/tikv/snapshot_fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package tikv

import (
"context"
"sync/atomic"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -153,6 +155,7 @@ func (s *testSnapshotFailSuite) TestRetryPointGetWithTS(c *C) {
err = txn.Set([]byte("k4"), []byte("v4"))
c.Assert(err, IsNil)
txn.SetOption(kv.EnableAsyncCommit, true)
txn.SetOption(kv.Enable1PC, false)
txn.SetOption(kv.GuaranteeLinearizability, false)
// Prewrite an async-commit lock and do not commit it.
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing", `return`), IsNil)
Expand All @@ -169,3 +172,45 @@ func (s *testSnapshotFailSuite) TestRetryPointGetWithTS(c *C) {

c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing"), IsNil)
}

func (s *testSnapshotFailSuite) TestRetryPointGetResolveTS(c *C) {
defer s.cleanup(c)

txn, err := s.store.Begin()
c.Assert(err, IsNil)
err = txn.Set([]byte("k1"), []byte("v1"))
err = txn.Set([]byte("k2"), []byte("v2"))
c.Assert(err, IsNil)
txn.SetOption(kv.EnableAsyncCommit, false)
txn.SetOption(kv.Enable1PC, false)
txn.SetOption(kv.GuaranteeLinearizability, false)

// Prewrite the lock without committing it
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforeCommit", `pause`), IsNil)
ch := make(chan struct{})
committer, err := newTwoPhaseCommitterWithInit(txn, 1)
c.Assert(committer.primary(), DeepEquals, []byte("k1"))
go func() {
c.Assert(err, IsNil)
err = committer.execute(context.Background())
c.Assert(err, IsNil)
ch <- struct{}{}
}()

// Wait until prewrite finishes
time.Sleep(200 * time.Millisecond)
// Should get nothing with max version, and **not pushing forward minCommitTS** of the primary lock
snapshot := s.store.GetSnapshot(kv.MaxVersion)
_, err = snapshot.Get(context.Background(), []byte("k2"))
c.Assert(err, ErrorMatches, ".*key not exist")

initialCommitTS := atomic.LoadUint64(&committer.commitTS)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforeCommit"), IsNil)

<-ch
// check the minCommitTS is not pushed forward
snapshot = s.store.GetSnapshot(kv.Version{Ver: initialCommitTS})
v, err := snapshot.Get(context.Background(), []byte("k2"))
c.Assert(err, IsNil)
c.Assert(v, DeepEquals, []byte("v2"))
}

0 comments on commit 9a75e50

Please sign in to comment.