Skip to content

Commit

Permalink
store/tikv: refine the retry mechanism of region errors (#25165)
Browse files Browse the repository at this point in the history
  • Loading branch information
youjiali1995 authored Jun 5, 2021
1 parent f837bc7 commit ce0e3c3
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 64 deletions.
18 changes: 10 additions & 8 deletions store/tikv/metrics/shortcuts.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ var (
RawkvSizeHistogramWithKey prometheus.Observer
RawkvSizeHistogramWithValue prometheus.Observer

BackoffHistogramRPC prometheus.Observer
BackoffHistogramLock prometheus.Observer
BackoffHistogramLockFast prometheus.Observer
BackoffHistogramPD prometheus.Observer
BackoffHistogramRegionMiss prometheus.Observer
BackoffHistogramServerBusy prometheus.Observer
BackoffHistogramStaleCmd prometheus.Observer
BackoffHistogramEmpty prometheus.Observer
BackoffHistogramRPC prometheus.Observer
BackoffHistogramLock prometheus.Observer
BackoffHistogramLockFast prometheus.Observer
BackoffHistogramPD prometheus.Observer
BackoffHistogramRegionMiss prometheus.Observer
BackoffHistogramRegionScheduling prometheus.Observer
BackoffHistogramServerBusy prometheus.Observer
BackoffHistogramStaleCmd prometheus.Observer
BackoffHistogramEmpty prometheus.Observer

TxnRegionsNumHistogramWithSnapshot prometheus.Observer
TxnRegionsNumHistogramPrewrite prometheus.Observer
Expand Down Expand Up @@ -120,6 +121,7 @@ func initShortcuts() {
BackoffHistogramLockFast = TiKVBackoffHistogram.WithLabelValues("tikvLockFast")
BackoffHistogramPD = TiKVBackoffHistogram.WithLabelValues("pdRPC")
BackoffHistogramRegionMiss = TiKVBackoffHistogram.WithLabelValues("regionMiss")
BackoffHistogramRegionScheduling = TiKVBackoffHistogram.WithLabelValues("regionScheduling")
BackoffHistogramServerBusy = TiKVBackoffHistogram.WithLabelValues("serverBusy")
BackoffHistogramStaleCmd = TiKVBackoffHistogram.WithLabelValues("staleCommand")
BackoffHistogramEmpty = TiKVBackoffHistogram.WithLabelValues("")
Expand Down
1 change: 1 addition & 0 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1550,6 +1550,7 @@ func (c *RegionCache) getStoresByLabels(labels []*metapb.StoreLabel) []*Store {
}

// OnRegionEpochNotMatch removes the old region and inserts new regions into the cache.
// It returns whether retries the request because it's possible the region epoch is ahead of TiKV's due to slow appling.
func (c *RegionCache) OnRegionEpochNotMatch(bo *Backoffer, ctx *RPCContext, currentRegions []*metapb.Region) (bool, error) {
if len(currentRegions) == 0 {
c.InvalidateCachedRegionWithReason(ctx.Region, EpochNotMatch)
Expand Down
126 changes: 73 additions & 53 deletions store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,8 +879,11 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, req
if ctx.tryTimes < 1 && req != nil && req.TxnScope == oracle.GlobalTxnScope && req.GetStaleRead() {
*opts = append(*opts, WithLeaderOnly())
}
seed := req.GetReplicaReadSeed()

// NOTE: Please add the region error handler in the same order of errorpb.Error.
metrics.TiKVRegionErrorCounter.WithLabelValues(regionErrorToLabel(regionErr)).Inc()

if notLeader := regionErr.GetNotLeader(); notLeader != nil {
// Retry if error is `NotLeader`.
logutil.BgLogger().Debug("tikv reports `NotLeader` retry later",
Expand All @@ -891,7 +894,7 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, req
leader := notLeader.GetLeader()
if leader == nil {
// The region may be during transferring leader.
if err = bo.Backoff(retry.BoRegionMiss, errors.Errorf("no leader, ctx: %v", ctx)); err != nil {
if err = bo.Backoff(retry.BoRegionScheduling, errors.Errorf("no leader, ctx: %v", ctx)); err != nil {
return false, errors.Trace(err)
}
} else {
Expand All @@ -904,7 +907,7 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, req
// isolated and removed from the Raft group. So it's necessary to reload
// the region from PD.
s.regionCache.InvalidateCachedRegionWithReason(ctx.Region, NoLeader)
if err = bo.Backoff(retry.BoRegionMiss, errors.Errorf("not leader: %v, ctx: %v", notLeader, ctx)); err != nil {
if err = bo.Backoff(retry.BoRegionScheduling, errors.Errorf("not leader: %v, ctx: %v", notLeader, ctx)); err != nil {
return false, errors.Trace(err)
}
return false, nil
Expand All @@ -915,17 +918,23 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, req
}
}

if storeNotMatch := regionErr.GetStoreNotMatch(); storeNotMatch != nil {
// store not match
logutil.BgLogger().Debug("tikv reports `StoreNotMatch` retry later",
zap.Stringer("storeNotMatch", storeNotMatch),
zap.Stringer("ctx", ctx))
ctx.Store.markNeedCheck(s.regionCache.notifyCheckCh)
// This peer is removed from the region. Invalidate the region since it's too stale.
if regionErr.GetRegionNotFound() != nil {
if seed != nil {
logutil.BgLogger().Debug("tikv reports `RegionNotFound` in follow-reader",
zap.Stringer("ctx", ctx), zap.Uint32("seed", *seed))
*seed = *seed + 1
}
s.regionCache.InvalidateCachedRegion(ctx.Region)
return false, nil
}

if regionErr.GetKeyNotInRegion() != nil {
logutil.BgLogger().Debug("tikv reports `KeyNotInRegion`", zap.Stringer("ctx", ctx))
s.regionCache.InvalidateCachedRegion(ctx.Region)
return false, nil
}

seed := req.GetReplicaReadSeed()
if epochNotMatch := regionErr.GetEpochNotMatch(); epochNotMatch != nil {
logutil.BgLogger().Debug("tikv reports `EpochNotMatch` retry later",
zap.Stringer("EpochNotMatch", epochNotMatch),
Expand All @@ -939,6 +948,7 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, req
}
return retry, errors.Trace(err)
}

if regionErr.GetServerIsBusy() != nil {
logutil.BgLogger().Warn("tikv reports `ServerIsBusy` retry later",
zap.String("reason", regionErr.GetServerIsBusy().GetReason()),
Expand All @@ -956,11 +966,15 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, req
}
return true, nil
}

// StaleCommand error indicates the request is sent to the old leader and its term is changed.
// We can't know whether the request is committed or not, so it's an undetermined error too,
// but we don't handle it now.
if regionErr.GetStaleCommand() != nil {
logutil.BgLogger().Debug("tikv reports `StaleCommand`", zap.Stringer("ctx", ctx))
if s.leaderReplicaSelector != nil {
// Needn't backoff because stale command indicates the command is sent to the old leader.
// The new leader should be elected soon and the leaderReplicaSelector will try the next peer.
// Needn't backoff because the new leader should be elected soon
// and the leaderReplicaSelector will try the next peer.
} else {
err = bo.Backoff(retry.BoStaleCmd, errors.Errorf("stale command, ctx: %v", ctx))
if err != nil {
Expand All @@ -969,36 +983,37 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, req
}
return true, nil
}

if storeNotMatch := regionErr.GetStoreNotMatch(); storeNotMatch != nil {
// store not match
logutil.BgLogger().Debug("tikv reports `StoreNotMatch` retry later",
zap.Stringer("storeNotMatch", storeNotMatch),
zap.Stringer("ctx", ctx))
ctx.Store.markNeedCheck(s.regionCache.notifyCheckCh)
s.regionCache.InvalidateCachedRegion(ctx.Region)
return false, nil
}

if regionErr.GetRaftEntryTooLarge() != nil {
logutil.BgLogger().Warn("tikv reports `RaftEntryTooLarge`", zap.Stringer("ctx", ctx))
return false, errors.New(regionErr.String())
}
// A stale read request may be sent to a peer which the data is not ready yet, we should retry in this case.
if regionErr.GetDataIsNotReady() != nil {
logutil.BgLogger().Warn("tikv reports `DataIsNotReady` retry later",
zap.Uint64("store-id", ctx.Store.storeID),
zap.Uint64("peer-id", regionErr.GetDataIsNotReady().GetPeerId()),
zap.Uint64("region-id", regionErr.GetDataIsNotReady().GetRegionId()),
zap.Uint64("safe-ts", regionErr.GetDataIsNotReady().GetSafeTs()),
zap.Stringer("ctx", ctx))
// Don't backoff if it's a replica-read.
if seed != nil {
*seed = *seed + 1
} else {
// The region is merging or splitting.
err = bo.Backoff(retry.BoRegionMiss, errors.Errorf("data is not ready, ctx: %v", ctx))
if err != nil {
return false, errors.Trace(err)
}
if s.leaderReplicaSelector != nil {
s.leaderReplicaSelector.rewind()
}

if regionErr.GetMaxTimestampNotSynced() != nil {
logutil.BgLogger().Debug("tikv reports `MaxTimestampNotSynced`", zap.Stringer("ctx", ctx))
err = bo.Backoff(retry.BoMaxTsNotSynced, errors.Errorf("max timestamp not synced, ctx: %v", ctx))
if err != nil {
return false, errors.Trace(err)
}
if s.leaderReplicaSelector != nil {
s.leaderReplicaSelector.rewind()
}
return true, nil
}

// A read request may be sent to a peer which has not been initialized yet, we should retry in this case.
if regionErr.GetRegionNotInitialized() != nil {
logutil.BgLogger().Warn("tikv reports `RegionNotInitialized` retry later",
logutil.BgLogger().Debug("tikv reports `RegionNotInitialized` retry later",
zap.Uint64("store-id", ctx.Store.storeID),
zap.Uint64("region-id", regionErr.GetRegionNotInitialized().GetRegionId()),
zap.Stringer("ctx", ctx))
Expand All @@ -1007,21 +1022,18 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, req
}
return true, nil
}
if regionErr.GetMaxTimestampNotSynced() != nil {
logutil.BgLogger().Debug("tikv reports `MaxTimestampNotSynced`", zap.Stringer("ctx", ctx))
err = bo.Backoff(retry.BoMaxTsNotSynced, errors.Errorf("max timestamp not synced, ctx: %v", ctx))
if err != nil {
return false, errors.Trace(err)
}
if s.leaderReplicaSelector != nil {
s.leaderReplicaSelector.rewind()
}
return true, nil
}

// The read-index can't be handled timely because the region is splitting or merging.
if regionErr.GetReadIndexNotReady() != nil {
logutil.BgLogger().Debug("tikv reports `ReadIndexNotReady`", zap.Stringer("ctx", ctx))
// The region is merging or splitting.
err = bo.Backoff(retry.BoRegionMiss, errors.Errorf("read index not ready, ctx: %v", ctx))
logutil.BgLogger().Debug("tikv reports `ReadIndexNotReady` retry later",
zap.Uint64("store-id", ctx.Store.storeID),
zap.Uint64("region-id", regionErr.GetRegionNotInitialized().GetRegionId()),
zap.Stringer("ctx", ctx))
if seed != nil {
*seed = *seed + 1
}
// The region can't provide service until split or merge finished, so backoff.
err = bo.Backoff(retry.BoRegionScheduling, errors.Errorf("read index not ready, ctx: %v", ctx))
if err != nil {
return false, errors.Trace(err)
}
Expand All @@ -1030,10 +1042,11 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, req
}
return true, nil
}

if regionErr.GetProposalInMergingMode() != nil {
logutil.BgLogger().Debug("tikv reports `ProposalInMergingMode`", zap.Stringer("ctx", ctx))
// The region is merging.
err = bo.Backoff(retry.BoRegionMiss, errors.Errorf("region is merging, ctx: %v", ctx))
// The region is merging and it can't provide service until merge finished, so backoff.
err = bo.Backoff(retry.BoRegionScheduling, errors.Errorf("region is merging, ctx: %v", ctx))
if err != nil {
return false, errors.Trace(err)
}
Expand All @@ -1042,14 +1055,21 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, req
}
return true, nil
}
if regionErr.GetRegionNotFound() != nil {

// A stale read request may be sent to a peer which the data is not ready yet, we should retry in this case.
// This error is specific to stale read and the target replica is randomly selected. If the request is sent
// to the leader, the data must be ready, so we don't backoff here.
if regionErr.GetDataIsNotReady() != nil {
logutil.BgLogger().Warn("tikv reports `DataIsNotReady` retry later",
zap.Uint64("store-id", ctx.Store.storeID),
zap.Uint64("peer-id", regionErr.GetDataIsNotReady().GetPeerId()),
zap.Uint64("region-id", regionErr.GetDataIsNotReady().GetRegionId()),
zap.Uint64("safe-ts", regionErr.GetDataIsNotReady().GetSafeTs()),
zap.Stringer("ctx", ctx))
if seed != nil {
logutil.BgLogger().Debug("tikv reports `RegionNotFound` in follow-reader",
zap.Stringer("ctx", ctx), zap.Uint32("seed", *seed))
*seed = *seed + 1
}
s.regionCache.InvalidateCachedRegion(ctx.Region)
return false, nil
return true, nil
}

logutil.BgLogger().Debug("tikv reports region failed",
Expand Down
6 changes: 3 additions & 3 deletions store/tikv/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestStoreTokenLimit(c *C) {

// Test whether the Stale Read request will retry the leader or other peers on error.
func (s *testRegionRequestToThreeStoresSuite) TestStaleReadRetry(c *C) {
var seed uint32 = 0
var seed uint32
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kv.ReplicaReadMixed, &seed)
req.EnableStaleRead()

Expand Down Expand Up @@ -1140,14 +1140,14 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector(c *
c.Assert(bo.GetTotalBackoffTimes(), Equals, maxReplicaAttempt+2)
s.cluster.StartStore(s.storeIDs[0])

// Verify that retry the same replica when meets ServerIsBusy/MaxTimestampNotSynced/ReadIndexNotReady/ProposalInMergingMode/DataIsNotReady.
// Verify that retry the same replica when meets ServerIsBusy/MaxTimestampNotSynced/ReadIndexNotReady/ProposalInMergingMode.
for _, regionErr := range []*errorpb.Error{
// ServerIsBusy takes too much time to test.
// {ServerIsBusy: &errorpb.ServerIsBusy{}},
{MaxTimestampNotSynced: &errorpb.MaxTimestampNotSynced{}},
{ReadIndexNotReady: &errorpb.ReadIndexNotReady{}},
{ProposalInMergingMode: &errorpb.ProposalInMergingMode{}},
{DataIsNotReady: &errorpb.DataIsNotReady{}}} {
} {
func() {
oc := sender.client
defer func() {
Expand Down
1 change: 1 addition & 0 deletions store/tikv/retry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ var (
BoPDRPC = NewConfig("pdRPC", &metrics.BackoffHistogramPD, NewBackoffFnCfg(500, 3000, EqualJitter), tikverr.NewErrPDServerTimeout(""))
// change base time to 2ms, because it may recover soon.
BoRegionMiss = NewConfig("regionMiss", &metrics.BackoffHistogramRegionMiss, NewBackoffFnCfg(2, 500, NoJitter), tikverr.ErrRegionUnavailable)
BoRegionScheduling = NewConfig("regionScheduling", &metrics.BackoffHistogramRegionScheduling, NewBackoffFnCfg(2, 500, NoJitter), tikverr.ErrRegionUnavailable)
BoTiKVServerBusy = NewConfig("tikvServerBusy", &metrics.BackoffHistogramServerBusy, NewBackoffFnCfg(2000, 10000, EqualJitter), tikverr.ErrTiKVServerBusy)
BoTiFlashServerBusy = NewConfig("tiflashServerBusy", &metrics.BackoffHistogramServerBusy, NewBackoffFnCfg(2000, 10000, EqualJitter), tikverr.ErrTiFlashServerBusy)
BoTxnNotFound = NewConfig("txnNotFound", &metrics.BackoffHistogramEmpty, NewBackoffFnCfg(2, 500, NoJitter), tikverr.ErrResolveLockTimeout)
Expand Down

0 comments on commit ce0e3c3

Please sign in to comment.