Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: support fallback from async commit (take 2) #21531

Merged
merged 29 commits into from
Dec 10, 2020
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
2543b3c
implement new resolver
sticnarf Dec 3, 2020
b3899e8
update proto
sticnarf Dec 4, 2020
1ff0410
disable async commit when 1pc fallback
sticnarf Dec 4, 2020
a289c9c
update kvproto
sticnarf Dec 7, 2020
2af8f9d
fix compile
sticnarf Dec 7, 2020
006fb08
add resolve lock tests
sticnarf Dec 7, 2020
6f86958
update kvproto
sticnarf Dec 7, 2020
5bf99bb
rename
sticnarf Dec 7, 2020
bd19a29
Merge branch 'master' into fallback-v2
sticnarf Dec 7, 2020
2be7b1c
more unit test
sticnarf Dec 7, 2020
4c62c04
Merge branch 'master' into fallback-v2
sticnarf Dec 7, 2020
6891747
fix unit test
sticnarf Dec 8, 2020
3cdb7ad
Merge branch 'fallback-v2' of https://github.com/sticnarf/tidb into f…
sticnarf Dec 8, 2020
e0dcc6e
Merge remote-tracking branch 'upstream/master' into fallback-v2
sticnarf Dec 8, 2020
80bd05d
update unistore to fix test
sticnarf Dec 8, 2020
b569fff
update kvproto
sticnarf Dec 8, 2020
54663b9
Merge remote-tracking branch 'upstream/master' into fallback-v2
sticnarf Dec 8, 2020
bee7dfd
Merge remote-tracking branch 'upstream/master' into fallback-v2
sticnarf Dec 8, 2020
df070a5
support commit with schema change
sticnarf Dec 8, 2020
cacb8c6
update unistore
sticnarf Dec 8, 2020
df070f3
Merge remote-tracking branch 'upstream/master' into fallback-v2
sticnarf Dec 9, 2020
caf5fe3
resolve comments
sticnarf Dec 9, 2020
9ad68e1
Merge remote-tracking branch 'upstream/master' into fallback-v2
sticnarf Dec 9, 2020
7ed4751
return error when minCommitTs is not zero during 1pc fallback
sticnarf Dec 9, 2020
1cdf3e8
Merge branch 'master' into fallback-v2
ti-srebot Dec 9, 2020
6ae246e
make lock test more stable
sticnarf Dec 10, 2020
00bc888
Merge remote-tracking branch 'upstream/master' into fallback-v2
sticnarf Dec 10, 2020
f9ce470
Merge branch 'fallback-v2' of https://github.com/sticnarf/tidb into f…
sticnarf Dec 10, 2020
b3b1acd
Merge branch 'master' into fallback-v2
ti-srebot Dec 10, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
github.com/kr/text v0.2.0 // indirect
github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef
github.com/ngaut/unistore v0.0.0-20201113064408-907e3fcf8e7d
github.com/ngaut/unistore v0.0.0-20201208082126-4766545aa5b5
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/opentracing/basictracer-go v1.0.0
github.com/opentracing/opentracing-go v1.1.0
Expand All @@ -45,7 +45,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20201130052818-5dfa7b1325a3
github.com/pingcap/kvproto v0.0.0-20201208043834-923c9609272c
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8
github.com/pingcap/parser v0.0.0-20201203085211-44f6be1df1c4
github.com/pingcap/sysutil v0.0.0-20201130064824-f0c8aa6a6966
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -590,8 +590,8 @@ github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 h1:7KAv7KMGTTqSmYZtNdc
github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7/go.mod h1:iWMfgwqYW+e8n5lC/jjNEhwcjbRDpl5NT7n2h+4UNcI=
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef h1:K0Fn+DoFqNqktdZtdV3bPQ/0cuYh2H4rkg0tytX/07k=
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef/go.mod h1:7WjlapSfwQyo6LNmIvEWzsW1hbBQfpUO4JWnuQRmva8=
github.com/ngaut/unistore v0.0.0-20201113064408-907e3fcf8e7d h1:hh0yCo0UtCuakNdkiRPaLHqzfgxacwUk6/pb9iJyJKU=
github.com/ngaut/unistore v0.0.0-20201113064408-907e3fcf8e7d/go.mod h1:ZR3NH+HzqfiYetwdoAivApnIy8iefPZHTMLfrFNm8g4=
github.com/ngaut/unistore v0.0.0-20201208082126-4766545aa5b5 h1:inEktZjWoqSSRB8P6Zkj8cgwnbaAiSObeisgr/36L8U=
github.com/ngaut/unistore v0.0.0-20201208082126-4766545aa5b5/go.mod h1:ZR3NH+HzqfiYetwdoAivApnIy8iefPZHTMLfrFNm8g4=
github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
Expand Down Expand Up @@ -682,8 +682,8 @@ github.com/pingcap/kvproto v0.0.0-20200417092353-efbe03bcffbd/go.mod h1:IOdRDPLy
github.com/pingcap/kvproto v0.0.0-20200420075417-e0c6e8842f22/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200810113304-6157337686b1/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20201113092725-08f2872278eb/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20201130052818-5dfa7b1325a3 h1:cpYxg8ggZU3UhVVd4iafhzetjEl2xB1KVjuhEKOhmjU=
github.com/pingcap/kvproto v0.0.0-20201130052818-5dfa7b1325a3/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20201208043834-923c9609272c h1:RbI6VpxZjaVVkeuxzEKCxw20+FWtXiIhgM+mvzhTc8I=
github.com/pingcap/kvproto v0.0.0-20201208043834-923c9609272c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
8 changes: 2 additions & 6 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2068,9 +2068,7 @@ func (s *testPessimisticSuite) TestAsyncCommitWithSchemaChange(c *C) {
tk2.MustExec("alter table tk add index k2(c2)")
}()
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePrewrite", "1*sleep(1200)"), IsNil)
_ = tk.ExecToErr("commit")
// TODO: wait for https://github.com/pingcap/tidb/pull/21531
// c.Assert(err, ErrorMatches, ".*commit TS \\d+ is too large")
tk.MustExec("commit")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforePrewrite"), IsNil)
tk3.MustExec("admin check table tk")
}
Expand Down Expand Up @@ -2123,9 +2121,7 @@ func (s *testPessimisticSuite) Test1PCWithSchemaChange(c *C) {
tk2.MustExec("alter table tk add index k2(c2)")
}()
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePrewrite", "1*sleep(1000)"), IsNil)
_ = tk.ExecToErr("commit")
// TODO: Check the error after supporting falling back to 2PC in TiKV.
// c.Assert(err, IsNil)
tk.MustExec("commit")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforePrewrite"), IsNil)
tk3.MustExec("admin check table tk")
}
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) {

lr := newLockResolver(s.store)
bo := NewBackofferWithVars(context.Background(), getMaxBackoff, nil)
status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS, true)
status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS, true, false)
c.Assert(err, IsNil)
c.Assert(status.ttl, GreaterEqual, lockInfo.LockTtl)

Expand Down
4 changes: 3 additions & 1 deletion store/tikv/async_commit_fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (s *testAsyncCommitFailSuite) TestSecondaryListInPrimaryLock(c *C) {

primary := txn.committer.primary()
bo := NewBackofferWithVars(context.Background(), 5000, nil)
txnStatus, err := s.store.lockResolver.getTxnStatus(bo, txn.StartTS(), primary, 0, 0, false)
txnStatus, err := s.store.lockResolver.getTxnStatus(bo, txn.StartTS(), primary, 0, 0, false, false)
c.Assert(err, IsNil)
c.Assert(txnStatus.IsCommitted(), IsFalse)
c.Assert(txnStatus.action, Equals, kvrpcpb.Action_NoAction)
Expand All @@ -203,6 +203,8 @@ func (s *testAsyncCommitFailSuite) TestSecondaryListInPrimaryLock(c *C) {
c.Assert(gotSecondaries, DeepEquals, expectedSecondaries)

c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing"), IsNil)
txn.committer.cleanup(context.Background())
txn.committer.cleanWg.Wait()
}

test([]string{"a"}, []string{"a1"})
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/async_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (s *testAsyncCommitSuite) TestCheckSecondaries(c *C) {
c.Assert(err, IsNil)
currentTS, err := s.store.oracle.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
c.Assert(err, IsNil)
status, err = s.store.lockResolver.getTxnStatus(s.bo, lock.TxnID, []byte("z"), currentTS, currentTS, true)
status, err = s.store.lockResolver.getTxnStatus(s.bo, lock.TxnID, []byte("z"), currentTS, currentTS, true, false)
c.Assert(err, IsNil)
c.Assert(status.IsCommitted(), IsTrue)
c.Assert(status.CommitTS(), Equals, ts)
Expand All @@ -234,7 +234,7 @@ func (s *testAsyncCommitSuite) TestCheckSecondaries(c *C) {
atomic.StoreInt64(&gotCheckA, 1)

resp = kvrpcpb.CheckSecondaryLocksResponse{
Locks: []*kvrpcpb.LockInfo{{Key: []byte("a"), PrimaryLock: []byte("z"), LockVersion: ts}},
Locks: []*kvrpcpb.LockInfo{{Key: []byte("a"), PrimaryLock: []byte("z"), LockVersion: ts, UseAsyncCommit: true}},
CommitTs: commitTs,
}
} else if bytes.Equal(k, []byte("i")) {
Expand Down
85 changes: 53 additions & 32 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
tikvLockResolverCountWithExpired.Inc()

// Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not!
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true)
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true, false)
if err != nil {
return false, err
}
Expand All @@ -242,11 +242,16 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
// Then we need to check the secondary locks to determine the final status of the transaction.
if status.primaryLock != nil && status.primaryLock.UseAsyncCommit {
resolveData, err := lr.checkAllSecondaries(bo, l, &status)
if err != nil {
return false, err
if err == nil {
txnInfos[l.TxnID] = resolveData.commitTs
continue
}
if _, ok := err.(*nonAsyncCommitLock); ok {
sticnarf marked this conversation as resolved.
Show resolved Hide resolved
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true, true)
if err != nil {
return false, err
}
}
sticnarf marked this conversation as resolved.
Show resolved Hide resolved
txnInfos[l.TxnID] = resolveData.commitTs
continue
}

if status.ttl > 0 {
Expand Down Expand Up @@ -344,12 +349,11 @@ func (lr *LockResolver) resolveLocks(bo *Backoffer, callerStartTS uint64, locks
pushed = make([]uint64, 0, len(locks))
}

for _, l := range locks {
status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS)
var resolve func(*Lock, bool) error
resolve = func(l *Lock, forceSyncCommit bool) error {
status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS, forceSyncCommit)
if err != nil {
msBeforeTxnExpired.update(0)
err = errors.Trace(err)
return msBeforeTxnExpired.value(), nil, err
return err
}

if status.ttl == 0 {
Expand All @@ -361,17 +365,18 @@ func (lr *LockResolver) resolveLocks(bo *Backoffer, callerStartTS uint64, locks
cleanTxns[l.TxnID] = cleanRegions
}

if status.primaryLock != nil && status.primaryLock.UseAsyncCommit && !exists {
if status.primaryLock != nil && !forceSyncCommit && status.primaryLock.UseAsyncCommit && !exists {
err = lr.resolveLockAsync(bo, l, status)
if _, ok := err.(*nonAsyncCommitLock); ok {
err = resolve(l, true)
}
} else if l.LockType == kvrpcpb.Op_PessimisticLock {
err = lr.resolvePessimisticLock(bo, l, cleanRegions)
} else {
err = lr.resolveLock(bo, l, status, lite, cleanRegions)
}
if err != nil {
msBeforeTxnExpired.update(0)
err = errors.Trace(err)
return msBeforeTxnExpired.value(), nil, err
return err
}
} else {
tikvLockResolverCountWithNotExpired.Inc()
Expand All @@ -386,16 +391,26 @@ func (lr *LockResolver) resolveLocks(bo *Backoffer, callerStartTS uint64, locks
// This could avoids the deadlock scene of two large transaction.
if l.LockType != kvrpcpb.Op_PessimisticLock && l.TxnID > callerStartTS {
tikvLockResolverCountWithWriteConflict.Inc()
return msBeforeTxnExpired.value(), nil, kv.ErrWriteConflict.GenWithStackByArgs(callerStartTS, l.TxnID, status.commitTS, l.Key)
return kv.ErrWriteConflict.GenWithStackByArgs(callerStartTS, l.TxnID, status.commitTS, l.Key)
}
} else {
if status.action != kvrpcpb.Action_MinCommitTSPushed {
pushFail = true
continue
return nil
}
pushed = append(pushed, l.TxnID)
}
}
return nil
}

for _, l := range locks {
err := resolve(l, false)
if err != nil {
msBeforeTxnExpired.update(0)
err = errors.Trace(err)
return msBeforeTxnExpired.value(), nil, err
}
}
if pushFail {
// If any of the lock fails to push minCommitTS, don't return the pushed array.
Expand Down Expand Up @@ -451,14 +466,14 @@ func (lr *LockResolver) GetTxnStatus(txnID uint64, callerStartTS uint64, primary
if err != nil {
return status, err
}
return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS, true)
return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS, true, false)
}

func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64) (TxnStatus, error) {
func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64, forceSyncCommit bool) (TxnStatus, error) {
var currentTS uint64
var err error
var status TxnStatus
if l.UseAsyncCommit {
if l.UseAsyncCommit && !forceSyncCommit {
// Async commit doesn't need the current ts since it uses the minCommitTS.
currentTS = 0
// Set to 0 so as not to push forward min commit ts.
Expand All @@ -481,7 +496,7 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart
time.Sleep(100 * time.Millisecond)
})
for {
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS, rollbackIfNotExist)
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS, rollbackIfNotExist, forceSyncCommit)
if err == nil {
return status, nil
}
Expand Down Expand Up @@ -533,7 +548,8 @@ func (e txnNotFoundErr) Error() string {

// getTxnStatus sends the CheckTxnStatus request to the TiKV server.
// When rollbackIfNotExist is false, the caller should be careful with the txnNotFoundErr error.
func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64, rollbackIfNotExist bool) (TxnStatus, error) {
func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte,
callerStartTS, currentTS uint64, rollbackIfNotExist bool, forceSyncCommit bool) (TxnStatus, error) {
if s, ok := lr.getResolved(txnID); ok {
return s, nil
}
Expand All @@ -556,6 +572,7 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
CallerStartTs: callerStartTS,
CurrentTs: currentTS,
RollbackIfNotExist: rollbackIfNotExist,
ForceSyncCommit: forceSyncCommit,
})
for {
loc, err := lr.store.GetRegionCache().LocateKey(bo, primary)
Expand Down Expand Up @@ -628,6 +645,12 @@ type asyncResolveData struct {
missingLock bool
}

type nonAsyncCommitLock struct{}

func (*nonAsyncCommitLock) Error() string {
return "CheckSecondaryLocks receives a non-async-commit lock"
}

// addKeys adds the keys from locks to data, keeping other fields up to date. startTS and commitTS are for the
// transaction being resolved.
//
Expand Down Expand Up @@ -671,7 +694,9 @@ func (data *asyncResolveData) addKeys(locks []*kvrpcpb.LockInfo, expected int, s
logutil.BgLogger().Error("addLocks error", zap.Error(err))
return err
}

if !lockInfo.UseAsyncCommit {
return &nonAsyncCommitLock{}
}
if !data.missingLock && lockInfo.MinCommitTs > data.commitTs {
data.commitTs = lockInfo.MinCommitTs
}
Expand Down Expand Up @@ -786,28 +811,24 @@ func (lr *LockResolver) checkAllSecondaries(bo *Backoffer, l *Lock, status *TxnS
}

errChan := make(chan error, len(regions))

checkBo, cancel := bo.Fork()
defer cancel()
for regionID, keys := range regions {
curRegionID := regionID
curKeys := keys

go func() {
errChan <- lr.checkSecondaries(bo, l.TxnID, curKeys, curRegionID, &shared)
errChan <- lr.checkSecondaries(checkBo, l.TxnID, curKeys, curRegionID, &shared)
}()
}

var errs []string
for range regions {
err1 := <-errChan
if err1 != nil {
errs = append(errs, err1.Error())
err := <-errChan
if err != nil {
return nil, err
}
}

if len(errs) > 0 {
return nil, errors.Errorf("async commit recovery (sending CheckSecondaryLocks) finished with errors: %v", errs)
}

return &shared, nil
}

Expand Down
Loading