Skip to content

Commit

Permalink
store/tikv: clear undetermined error if txn determines to fail (#25120)
Browse files Browse the repository at this point in the history
  • Loading branch information
sticnarf authored Jun 4, 2021
1 parent 6154335 commit 5a74d52
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 4 deletions.
10 changes: 7 additions & 3 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type twoPhaseCommitter struct {
maxCommitTS uint64
prewriteStarted bool
prewriteCancelled uint32
prewriteFailed uint32
useOnePC uint32
onePCCommitTS uint64

Expand Down Expand Up @@ -1043,10 +1044,13 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
start := time.Now()
err = c.prewriteMutations(bo, c.mutations)

// Return an undetermined error only if we don't know the transaction fails.
// If it fails due to a write conflict or a already existed unique key, we
// needn't return an undetermined error even if such an error is set.
if atomic.LoadUint32(&c.prewriteFailed) == 1 {
c.setUndeterminedErr(nil)
}
if err != nil {
// TODO: Now we return an undetermined error as long as one of the prewrite
// RPCs fails. However, if there are multiple errors and some of the errors
// are not RPC failures, we can return the actual error instead of undetermined.
if undeterminedErr := c.getUndeterminedErr(); undeterminedErr != nil {
logutil.Logger(ctx).Error("2PC commit result undetermined",
zap.Error(err),
Expand Down
8 changes: 7 additions & 1 deletion store/tikv/prewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff
if err != nil {
return errors.Trace(err)
}
failpoint.Inject("forceRecursion", func() { same = false })
if same {
continue
}
Expand Down Expand Up @@ -276,12 +277,17 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff
// Check already exists error
if alreadyExist := keyErr.GetAlreadyExist(); alreadyExist != nil {
e := &tikverr.ErrKeyExist{AlreadyExist: alreadyExist}
return c.extractKeyExistsErr(e)
err = c.extractKeyExistsErr(e)
if err != nil {
atomic.StoreUint32(&c.prewriteFailed, 1)
}
return err
}

// Extract lock from key error
lock, err1 := extractLockFromKeyErr(keyErr)
if err1 != nil {
atomic.StoreUint32(&c.prewriteFailed, 1)
return errors.Trace(err1)
}
logutil.BgLogger().Info("prewrite encounters lock",
Expand Down
26 changes: 26 additions & 0 deletions store/tikv/tests/async_commit_fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,29 @@ func (s *testAsyncCommitFailSuite) TestAsyncCommitRPCErrorThenWriteConflict(c *C
c.Assert(err, NotNil)
c.Assert(txn.GetCommitter().GetUndeterminedErr(), IsNil)
}

// TestAsyncCommitRPCErrorThenWriteConflictInChild verifies that the determined failure error in a child recursion
// overwrites the undetermined error in the parent.
func (s *testAsyncCommitFailSuite) TestAsyncCommitRPCErrorThenWriteConflictInChild(c *C) {
// This test doesn't support tikv mode because it needs setting failpoint in unistore.
if *WithTiKV {
return
}

txn := s.beginAsyncCommit(c)
err := txn.Set([]byte("a"), []byte("va"))
c.Assert(err, IsNil)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/rpcPrewriteResult", `1*return("timeout")->return("writeConflict")`), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/forceRecursion", `return`), IsNil)

defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/rpcPrewriteResult"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/forceRecursion"), IsNil)
}()

ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
err = txn.Commit(ctx)
c.Assert(err, NotNil)
c.Assert(txn.GetCommitter().GetUndeterminedErr(), IsNil)
}

0 comments on commit 5a74d52

Please sign in to comment.