Skip to content

Commit

Permalink
store/tikv: export LockResolver and TxnStatus. (pingcap#1722)
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored Sep 14, 2016
1 parent a023230 commit f495ccc
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 18 deletions.
2 changes: 1 addition & 1 deletion store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func newTikvStore(uuid string, pdClient pd.Client, client Client, enableGC bool)
client: client,
regionCache: NewRegionCache(pdClient),
}
store.lockResolver = NewLockResolver(store)
store.lockResolver = newLockResolver(store)
if enableGC {
store.gcWorker, err = NewGCWorker(store)
if err != nil {
Expand Down
59 changes: 44 additions & 15 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ package tikv

import (
"container/list"
"fmt"
"sync"

"github.com/juju/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/pd/pd-client"
)

const resolvedCacheSize = 512
Expand All @@ -29,25 +31,42 @@ type LockResolver struct {
mu struct {
sync.RWMutex
// Cache resolved txns (FIFO, txn id -> txnStatus).
resolved map[uint64]txnStatus
resolved map[uint64]TxnStatus
recentResolved *list.List
}
}

// NewLockResolver creates a LockResolver.
func NewLockResolver(store *tikvStore) *LockResolver {
func newLockResolver(store *tikvStore) *LockResolver {
r := &LockResolver{
store: store,
}
r.mu.resolved = make(map[uint64]txnStatus)
r.mu.resolved = make(map[uint64]TxnStatus)
r.mu.recentResolved = list.New()
return r
}

type txnStatus uint64
// NewLockResolver creates a LockResolver.
func NewLockResolver(etcdAddrs []string, clusterID uint64) (*LockResolver, error) {
uuid := fmt.Sprintf("tikv-%v-%v", etcdAddrs, clusterID)
pdCli, err := pd.NewClient(etcdAddrs, clusterID)
if err != nil {
return nil, errors.Trace(err)
}
s, err := newTikvStore(uuid, &codecPDClient{pdCli}, newRPCClient(), false)
if err != nil {
return nil, errors.Trace(err)
}
return s.lockResolver, nil
}

func (s txnStatus) isCommitted() bool { return s > 0 }
func (s txnStatus) commitTS() uint64 { return uint64(s) }
// TxnStatus represents a txn's final status. It should be Commit or Rollback.
type TxnStatus uint64

// IsCommitted returns true if the txn's final status is Commit.
func (s TxnStatus) IsCommitted() bool { return s > 0 }

// CommitTS returns the txn's commitTS. It is valid iff `IsCommitted` is true.
func (s TxnStatus) CommitTS() uint64 { return uint64(s) }

// locks after 3000ms is considered unusual (the client created the lock might
// be dead). Other client may cleanup this kind of lock.
Expand All @@ -61,7 +80,7 @@ type Lock struct {
TxnID uint64
}

func (lr *LockResolver) saveResolved(txnID uint64, status txnStatus) {
func (lr *LockResolver) saveResolved(txnID uint64, status TxnStatus) {
lr.mu.Lock()
defer lr.mu.Unlock()

Expand All @@ -77,7 +96,7 @@ func (lr *LockResolver) saveResolved(txnID uint64, status txnStatus) {
}
}

func (lr *LockResolver) getResolved(txnID uint64) (txnStatus, bool) {
func (lr *LockResolver) getResolved(txnID uint64) (TxnStatus, bool) {
lr.mu.RLock()
defer lr.mu.RUnlock()

Expand Down Expand Up @@ -132,11 +151,21 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, locks []*Lock) (ok bool, err
return len(expiredLocks) == len(locks), nil
}

func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte) (txnStatus, error) {
// GetTxnStatus queries tikv-server for a txn's status (commit/rollback).
// If the primary key is still locked, it will launch a Rollback to abort it.
// To avoid unnecessarily aborting too many txns, it is wiser to wait a few
// seconds before calling it after Prewrite.
func (lr *LockResolver) GetTxnStatus(txnID uint64, primary []byte) (TxnStatus, error) {
bo := NewBackoffer(cleanupMaxBackoff)
status, err := lr.getTxnStatus(bo, txnID, primary)
return status, errors.Trace(err)
}

func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte) (TxnStatus, error) {
if s, ok := lr.getResolved(txnID); ok {
return s, nil
}
var status txnStatus
var status TxnStatus
req := &kvrpcpb.Request{
Type: kvrpcpb.MessageType_CmdCleanup,
CmdCleanupReq: &kvrpcpb.CmdCleanupRequest{
Expand Down Expand Up @@ -168,14 +197,14 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
return status, errors.Errorf("unexpected cleanup err: %s", keyErr)
}
if cmdResp.CommitVersion != 0 {
status = txnStatus(cmdResp.GetCommitVersion())
status = TxnStatus(cmdResp.GetCommitVersion())
}
lr.saveResolved(txnID, status)
return status, nil
}
}

func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status txnStatus, cleanRegions map[RegionVerID]struct{}) error {
func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, cleanRegions map[RegionVerID]struct{}) error {
for {
region, err := lr.store.regionCache.GetRegion(bo, l.Key)
if err != nil {
Expand All @@ -190,8 +219,8 @@ func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status txnStatus, cl
StartVersion: l.TxnID,
},
}
if status.isCommitted() {
req.GetCmdResolveLockReq().CommitVersion = status.commitTS()
if status.IsCommitted() {
req.GetCmdResolveLockReq().CommitVersion = status.CommitTS()
}
resp, err := lr.store.SendKVReq(bo, req, region.VerID(), readTimeoutShort)
if err != nil {
Expand Down
25 changes: 23 additions & 2 deletions store/tikv/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (s *testLockSuite) TearDownTest(c *C) {
s.store.Close()
}

func (s *testLockSuite) lockKey(c *C, key, value, primaryKey, primaryValue []byte, commitPrimary bool) {
func (s *testLockSuite) lockKey(c *C, key, value, primaryKey, primaryValue []byte, commitPrimary bool) (uint64, uint64) {
txn, err := newTiKVTxn(s.store)
c.Assert(err, IsNil)
if len(value) > 0 {
Expand Down Expand Up @@ -60,6 +60,7 @@ func (s *testLockSuite) lockKey(c *C, key, value, primaryKey, primaryValue []byt
err = committer.commitKeys(NewBackoffer(commitMaxBackoff), [][]byte{primaryKey})
c.Assert(err, IsNil)
}
return txn.startTS, committer.commitTS
}

func (s *testLockSuite) putAlphabets(c *C) {
Expand All @@ -68,13 +69,14 @@ func (s *testLockSuite) putAlphabets(c *C) {
}
}

func (s *testLockSuite) putKV(c *C, key, value []byte) {
func (s *testLockSuite) putKV(c *C, key, value []byte) (uint64, uint64) {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
err = txn.Set(key, value)
c.Assert(err, IsNil)
err = txn.Commit()
c.Assert(err, IsNil)
return txn.StartTS(), txn.(*tikvTxn).commitTS
}

func (s *testLockSuite) prepareAlphabetLocks(c *C) {
Expand Down Expand Up @@ -150,6 +152,25 @@ func (s *testLockSuite) TestCleanLock(c *C) {
c.Assert(err, IsNil)
}

func (s *testLockSuite) TestGetTxnStatus(c *C) {
startTS, commitTS := s.putKV(c, []byte("a"), []byte("a"))
status, err := s.store.lockResolver.GetTxnStatus(startTS, []byte("a"))
c.Assert(err, IsNil)
c.Assert(status.IsCommitted(), IsTrue)
c.Assert(status.CommitTS(), Equals, commitTS)

startTS, commitTS = s.lockKey(c, []byte("a"), []byte("a"), []byte("a"), []byte("a"), true)
status, err = s.store.lockResolver.GetTxnStatus(startTS, []byte("a"))
c.Assert(err, IsNil)
c.Assert(status.IsCommitted(), IsTrue)
c.Assert(status.CommitTS(), Equals, commitTS)

startTS, _ = s.lockKey(c, []byte("a"), []byte("a"), []byte("a"), []byte("a"), false)
status, err = s.store.lockResolver.GetTxnStatus(startTS, []byte("a"))
c.Assert(err, IsNil)
c.Assert(status.IsCommitted(), IsFalse)
}

func init() {
// Speed up tests.
lockTTL = 3
Expand Down

0 comments on commit f495ccc

Please sign in to comment.