Skip to content

Commit

Permalink
store/tikv: new retry logic for RegionRequestSender (#25040)
Browse files Browse the repository at this point in the history
  • Loading branch information
youjiali1995 authored Jun 3, 2021
1 parent e2340c1 commit 06b80b7
Show file tree
Hide file tree
Showing 16 changed files with 1,090 additions and 205 deletions.
2 changes: 2 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,13 @@ func (s *testPessimisticSuite) SetUpSuite(c *C) {
// Set it to 300ms for testing lock resolve.
atomic.StoreUint64(&tikv.ManagedLockTTL, 300)
tikv.PrewriteMaxBackoff = 500
tikv.VeryLongMaxBackoff = 500
}

func (s *testPessimisticSuite) TearDownSuite(c *C) {
s.testSessionSuiteBase.TearDownSuite(c)
tikv.PrewriteMaxBackoff = 20000
tikv.VeryLongMaxBackoff = 600000
}

func (s *testPessimisticSuite) TestPessimisticTxn(c *C) {
Expand Down
2 changes: 1 addition & 1 deletion store/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ func (s *testGCWorkerSuite) TestResolveLockRangeMeetRegionEnlargeCausedByRegionM
mCluster := s.cluster.(*mocktikv.Cluster)
mCluster.Merge(s.initRegion.regionID, region2)
regionMeta, _ := mCluster.GetRegion(s.initRegion.regionID)
err := s.tikvStore.GetRegionCache().OnRegionEpochNotMatch(
_, err := s.tikvStore.GetRegionCache().OnRegionEpochNotMatch(
retry.NewNoopBackoff(context.Background()),
&tikv.RPCContext{Region: regionID, Store: &tikv.Store{}},
[]*metapb.Region{regionMeta})
Expand Down
2 changes: 2 additions & 0 deletions store/mockstore/unistore/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
failpoint.Inject("rpcPrewriteResult", func(val failpoint.Value) {
if val != nil {
switch val.(string) {
case "timeout":
failpoint.Return(nil, errors.New("timeout"))
case "notLeader":
failpoint.Return(&tikvrpc.Response{
Resp: &kvrpcpb.PrewriteResponse{RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}}},
Expand Down
61 changes: 47 additions & 14 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ import (
zap "go.uber.org/zap"
)

// If the duration of a single request exceeds the slowRequestThreshold, a warning log will be logged.
const slowRequestThreshold = time.Minute

type twoPhaseCommitAction interface {
handleSingleBatch(*twoPhaseCommitter, *Backoffer, batchMutations) error
tiKVTxnRegionsNumHistogram() prometheus.Observer
Expand Down Expand Up @@ -520,8 +523,7 @@ func (c *twoPhaseCommitter) groupMutations(bo *Backoffer, mutations CommitterMut
logutil.BgLogger().Info("2PC detect large amount of mutations on a single region",
zap.Uint64("region", group.region.GetID()),
zap.Int("mutations count", group.mutations.Len()))
// Use context.Background, this time should not add up to Backoffer.
if c.store.preSplitRegion(context.Background(), group) {
if c.store.preSplitRegion(bo.GetCtx(), group) {
didPreSplit = true
}
}
Expand All @@ -547,7 +549,7 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPh
switch act := action.(type) {
case actionPrewrite:
// Do not update regionTxnSize on retries. They are not used when building a PrewriteRequest.
if bo.ErrorsNum() == 0 {
if !act.retry {
for _, group := range groups {
c.regionTxnSize[group.region.id] = group.mutations.Len()
}
Expand Down Expand Up @@ -612,7 +614,7 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPh
}
// Already spawned a goroutine for async commit transaction.
if actionIsCommit && !actionCommit.retry && !c.isAsyncCommit() {
secondaryBo := retry.NewBackofferWithVars(context.Background(), int(atomic.LoadUint64(&CommitMaxBackoff)), c.txn.vars)
secondaryBo := retry.NewBackofferWithVars(context.Background(), CommitSecondaryMaxBackoff, c.txn.vars)
go func() {
if c.sessionID > 0 {
failpoint.Inject("beforeCommitSecondaries", func(v failpoint.Value) {
Expand Down Expand Up @@ -897,6 +899,9 @@ const (
tsoMaxBackoff = 15000
)

// VeryLongMaxBackoff is the max sleep time of transaction commit.
var VeryLongMaxBackoff = uint64(600000) // 10mins

func (c *twoPhaseCommitter) cleanup(ctx context.Context) {
c.cleanWg.Add(1)
go func() {
Expand Down Expand Up @@ -993,13 +998,22 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
c.setOnePC(true)
c.hasTriedOnePC = true
}

// TODO(youjiali1995): It's better to use different maxSleep for different operations
// and distinguish permanent errors from temporary errors, for example:
// - If all PDs are down, all requests to PD will fail due to network error.
// The maxSleep should't be very long in this case.
// - If the region isn't found in PD, it's possible the reason is write-stall.
// The maxSleep can be long in this case.
bo := retry.NewBackofferWithVars(ctx, int(atomic.LoadUint64(&VeryLongMaxBackoff)), c.txn.vars)

// If we want to use async commit or 1PC and also want linearizability across
// all nodes, we have to make sure the commit TS of this transaction is greater
// than the snapshot TS of all existent readers. So we get a new timestamp
// from PD and plus one as our MinCommitTS.
if commitTSMayBeCalculated && c.needLinearizability() {
failpoint.Inject("getMinCommitTSFromTSO", nil)
latestTS, err := c.store.getTimestampWithRetry(retry.NewBackofferWithVars(ctx, tsoMaxBackoff, c.txn.vars), c.txn.GetScope())
latestTS, err := c.store.getTimestampWithRetry(bo, c.txn.GetScope())
// If we fail to get a timestamp from PD, we just propagate the failure
// instead of falling back to the normal 2PC because a normal 2PC will
// also be likely to fail due to the same timestamp issue.
Expand All @@ -1025,9 +1039,9 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
if c.shouldWriteBinlog() {
binlogChan = c.binlog.Prewrite(ctx, c.primary())
}
prewriteBo := retry.NewBackofferWithVars(ctx, PrewriteMaxBackoff, c.txn.vars)

start := time.Now()
err = c.prewriteMutations(prewriteBo, c.mutations)
err = c.prewriteMutations(bo, c.mutations)

if err != nil {
// TODO: Now we return an undetermined error as long as one of the prewrite
Expand All @@ -1044,12 +1058,14 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {

commitDetail := c.getDetail()
commitDetail.PrewriteTime = time.Since(start)
if prewriteBo.GetTotalSleep() > 0 {
atomic.AddInt64(&commitDetail.CommitBackoffTime, int64(prewriteBo.GetTotalSleep())*int64(time.Millisecond))
// TODO(youjiali1995): Record the backoff time of the last finished batch. It doesn't make sense to aggregate all batches'.
if bo.GetTotalSleep() > 0 {
atomic.AddInt64(&commitDetail.CommitBackoffTime, int64(bo.GetTotalSleep())*int64(time.Millisecond))
commitDetail.Mu.Lock()
commitDetail.Mu.BackoffTypes = append(commitDetail.Mu.BackoffTypes, prewriteBo.GetTypes()...)
commitDetail.Mu.BackoffTypes = append(commitDetail.Mu.BackoffTypes, bo.GetTypes()...)
commitDetail.Mu.Unlock()
}

if binlogChan != nil {
startWaitBinlog := time.Now()
binlogWriteResult := <-binlogChan
Expand Down Expand Up @@ -1184,7 +1200,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
failpoint.Inject("asyncCommitDoNothing", func() {
failpoint.Return()
})
commitBo := retry.NewBackofferWithVars(ctx, int(atomic.LoadUint64(&CommitMaxBackoff)), c.txn.vars)
commitBo := retry.NewBackofferWithVars(ctx, CommitSecondaryMaxBackoff, c.txn.vars)
err := c.commitMutations(commitBo, c.mutations)
if err != nil {
logutil.Logger(ctx).Warn("2PC async commit failed", zap.Uint64("sessionID", c.sessionID),
Expand All @@ -1200,7 +1216,8 @@ func (c *twoPhaseCommitter) commitTxn(ctx context.Context, commitDetail *util.Co
c.txn.GetMemBuffer().DiscardValues()
start := time.Now()

commitBo := retry.NewBackofferWithVars(ctx, int(atomic.LoadUint64(&CommitMaxBackoff)), c.txn.vars)
// Use the VeryLongMaxBackoff to commit the primary key.
commitBo := retry.NewBackofferWithVars(ctx, int(atomic.LoadUint64(&VeryLongMaxBackoff)), c.txn.vars)
err := c.commitMutations(commitBo, c.mutations)
commitDetail.CommitTime = time.Since(start)
if commitBo.GetTotalSleep() > 0 {
Expand Down Expand Up @@ -1466,6 +1483,20 @@ type batchMutations struct {
mutations CommitterMutations
isPrimary bool
}

func (b *batchMutations) relocate(bo *Backoffer, c *RegionCache) (bool, error) {
begin, end := b.mutations.GetKey(0), b.mutations.GetKey(b.mutations.Len()-1)
loc, err := c.LocateKey(bo, begin)
if err != nil {
return false, errors.Trace(err)
}
if !loc.Contains(end) {
return false, nil
}
b.region = loc.Region
return true, nil
}

type batched struct {
batches []batchMutations
primaryIdx int
Expand Down Expand Up @@ -1550,7 +1581,7 @@ type batchExecutor struct {
func newBatchExecutor(rateLimit int, committer *twoPhaseCommitter,
action twoPhaseCommitAction, backoffer *Backoffer) *batchExecutor {
return &batchExecutor{rateLimit, nil, committer,
action, backoffer, 1 * time.Millisecond}
action, backoffer, 0}
}

// initUtils do initialize batchExecutor related policies like rateLimit util
Expand Down Expand Up @@ -1647,7 +1678,9 @@ func (batchExe *batchExecutor) process(batches []batchMutations) error {
}
}
close(exitCh)
metrics.TiKVTokenWaitDuration.Observe(float64(batchExe.tokenWaitDuration.Nanoseconds()))
if batchExe.tokenWaitDuration > 0 {
metrics.TiKVTokenWaitDuration.Observe(float64(batchExe.tokenWaitDuration.Nanoseconds()))
}
return err
}

Expand Down
3 changes: 3 additions & 0 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type BackoffConfig = retry.Config
// Maximum total sleep time(in ms) for kv/cop commands.
const (
gcResolveLockMaxBackoff = 100000
pdRPCMaxBackoff = 20000
// CommitSecondaryMaxBackoff is max sleep time of the 'commit' command
CommitSecondaryMaxBackoff = 41000
)

var (
Expand Down
1 change: 1 addition & 0 deletions store/tikv/client/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,7 @@ func sendBatchRequest(
zap.String("to", addr), zap.String("cause", ctx.Err().Error()))
return nil, errors.Trace(ctx.Err())
case <-timer.C:
atomic.StoreInt32(&entry.canceled, 1)
return nil, errors.SuspendStack(errors.Annotate(context.DeadlineExceeded, "wait recvLoop"))
}
}
Expand Down
Loading

0 comments on commit 06b80b7

Please sign in to comment.