Skip to content

Commit

Permalink
store/tikv: better handle undetermined error for committing primary k…
Browse files Browse the repository at this point in the history
…ey (#25115)
  • Loading branch information
youjiali1995 authored Jun 3, 2021
1 parent 41e1e81 commit 057250c
Showing 1 changed file with 15 additions and 13 deletions.
28 changes: 15 additions & 13 deletions store/tikv/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (actionCommit) tiKVTxnRegionsNumHistogram() prometheus.Observer {
return metrics.TxnRegionsNumHistogramCommit
}

func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchMutations) (err error) {
func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchMutations) error {
keys := batch.mutations.GetKeys()
req := tikvrpc.NewRequest(tikvrpc.CmdCommit, &pb.CommitRequest{
StartVersion: c.startTS,
Expand All @@ -54,18 +54,6 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
attempts := 0

sender := NewRegionRequestSender(c.store.regionCache, c.store.GetTiKVClient())
defer func() {
if err != nil {
// If we fail to receive response for the request that commits primary key, it will be undetermined whether this
// transaction has been successfully committed.
// Under this circumstance, we can not declare the commit is complete (may lead to data lost), nor can we throw
// an error (may lead to the duplicated key error when upper level restarts the transaction). Currently the best
// solution is to populate this error and let upper layer drop the connection to the corresponding mysql client.
if batch.isPrimary && sender.rpcError != nil && !c.isAsyncCommit() {
c.setUndeterminedErr(errors.Trace(sender.rpcError))
}
}
}()
for {
attempts++
if time.Since(tBegin) > slowRequestThreshold {
Expand All @@ -74,6 +62,15 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
}

resp, err := sender.SendReq(bo, req, batch.region, client.ReadTimeoutShort)
// If we fail to receive response for the request that commits primary key, it will be undetermined whether this
// transaction has been successfully committed.
// Under this circumstance, we can not declare the commit is complete (may lead to data lost), nor can we throw
// an error (may lead to the duplicated key error when upper level restarts the transaction). Currently the best
// solution is to populate this error and let upper layer drop the connection to the corresponding mysql client.
if batch.isPrimary && sender.rpcError != nil && !c.isAsyncCommit() {
c.setUndeterminedErr(errors.Trace(sender.rpcError))
}

// Unexpected error occurs, return it.
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -108,6 +105,11 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
return errors.Trace(tikverr.ErrBodyMissing)
}
commitResp := resp.Resp.(*pb.CommitResponse)
// Here we can make sure tikv has processed the commit primary key request. So
// we can clean undetermined error.
if batch.isPrimary && !c.isAsyncCommit() {
c.setUndeterminedErr(nil)
}
if keyErr := commitResp.GetError(); keyErr != nil {
if rejected := keyErr.GetCommitTsExpired(); rejected != nil {
logutil.Logger(bo.GetCtx()).Info("2PC commitTS rejected by TiKV, retry with a newer commitTS",
Expand Down

0 comments on commit 057250c

Please sign in to comment.