Skip to content

Commit

Permalink
store/tikv: increase max backoff time of read requests (#25153)
Browse files Browse the repository at this point in the history
  • Loading branch information
sticnarf authored Jun 6, 2021
1 parent 11716e7 commit 7cc1ebc
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 27 deletions.
47 changes: 32 additions & 15 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,12 +730,15 @@ func (tm *ttlManager) close() {
close(tm.ch)
}

const pessimisticLockMaxBackoff = 20000
const keepAliveMaxBackoff = 20000 // 20 seconds
const pessimisticLockMaxBackoff = 600000 // 10 minutes
const maxConsecutiveFailure = 10

func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) {
// Ticker is set to 1/2 of the ManagedLockTTL.
ticker := time.NewTicker(time.Duration(atomic.LoadUint64(&ManagedLockTTL)) * time.Millisecond / 2)
defer ticker.Stop()
keepFail := 0
for {
select {
case <-tm.ch:
Expand All @@ -745,7 +748,7 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) {
if tm.lockCtx != nil && tm.lockCtx.Killed != nil && atomic.LoadUint32(tm.lockCtx.Killed) != 0 {
return
}
bo := retry.NewBackofferWithVars(context.Background(), pessimisticLockMaxBackoff, c.txn.vars)
bo := retry.NewBackofferWithVars(context.Background(), keepAliveMaxBackoff, c.txn.vars)
now, err := c.store.getTimestampWithRetry(bo, c.txn.GetScope())
if err != nil {
logutil.Logger(bo.GetCtx()).Warn("keepAlive get tso fail",
Expand Down Expand Up @@ -774,20 +777,29 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) {
logutil.Logger(bo.GetCtx()).Info("send TxnHeartBeat",
zap.Uint64("startTS", c.startTS), zap.Uint64("newTTL", newTTL))
startTime := time.Now()
_, err = sendTxnHeartBeat(bo, c.store, c.primary(), c.startTS, newTTL)
_, stopHeartBeat, err := sendTxnHeartBeat(bo, c.store, c.primary(), c.startTS, newTTL)
if err != nil {
keepFail++
metrics.TxnHeartBeatHistogramError.Observe(time.Since(startTime).Seconds())
logutil.Logger(bo.GetCtx()).Warn("send TxnHeartBeat failed",
logutil.Logger(bo.GetCtx()).Debug("send TxnHeartBeat failed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return
if stopHeartBeat || keepFail > maxConsecutiveFailure {
logutil.Logger(bo.GetCtx()).Warn("stop TxnHeartBeat",
zap.Error(err),
zap.Int("consecutiveFailure", keepFail),
zap.Uint64("txnStartTS", c.startTS))
return
}
continue
}
keepFail = 0
metrics.TxnHeartBeatHistogramOK.Observe(time.Since(startTime).Seconds())
}
}
}

func sendTxnHeartBeat(bo *Backoffer, store *KVStore, primary []byte, startTS, ttl uint64) (uint64, error) {
func sendTxnHeartBeat(bo *Backoffer, store *KVStore, primary []byte, startTS, ttl uint64) (newTTL uint64, stopHeartBeat bool, err error) {
req := tikvrpc.NewRequest(tikvrpc.CmdTxnHeartBeat, &pb.TxnHeartBeatRequest{
PrimaryLock: primary,
StartVersion: startTS,
Expand All @@ -796,31 +808,36 @@ func sendTxnHeartBeat(bo *Backoffer, store *KVStore, primary []byte, startTS, tt
for {
loc, err := store.GetRegionCache().LocateKey(bo, primary)
if err != nil {
return 0, errors.Trace(err)
return 0, false, errors.Trace(err)
}
resp, err := store.SendReq(bo, req, loc.Region, client.ReadTimeoutShort)
if err != nil {
return 0, errors.Trace(err)
return 0, false, errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return 0, errors.Trace(err)
return 0, false, errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return 0, errors.Trace(err)
// For other region error and the fake region error, backoff because
// there's something wrong.
// For the real EpochNotMatch error, don't backoff.
if regionErr.GetEpochNotMatch() == nil || isFakeRegionError(regionErr) {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return 0, false, errors.Trace(err)
}
}
continue
}
if resp.Resp == nil {
return 0, errors.Trace(tikverr.ErrBodyMissing)
return 0, false, errors.Trace(tikverr.ErrBodyMissing)
}
cmdResp := resp.Resp.(*pb.TxnHeartBeatResponse)
if keyErr := cmdResp.GetError(); keyErr != nil {
return 0, errors.Errorf("txn %d heartbeat fail, primary key = %v, err = %s", startTS, hex.EncodeToString(primary), extractKeyErr(keyErr))
return 0, true, errors.Errorf("txn %d heartbeat fail, primary key = %v, err = %s", startTS, hex.EncodeToString(primary), extractKeyErr(keyErr))
}
return cmdResp.GetLockTtl(), nil
return cmdResp.GetLockTtl(), false, nil
}
}

Expand Down
14 changes: 13 additions & 1 deletion store/tikv/pessimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,22 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
return errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
// For other region error and the fake region error, backoff because
// there's something wrong.
// For the real EpochNotMatch error, don't backoff.
if regionErr.GetEpochNotMatch() == nil || isFakeRegionError(regionErr) {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
}
}
same, err := batch.relocate(bo, c.store.regionCache)
if err != nil {
return errors.Trace(err)
}
if same {
continue
}
err = c.pessimisticLockMutations(bo, action.LockCtx, batch.mutations)
return errors.Trace(err)
}
Expand Down
13 changes: 9 additions & 4 deletions store/tikv/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (s *Scanner) Value() []byte {
return nil
}

const scannerNextMaxBackoff = 20000
const scannerNextMaxBackoff = 600000 // 10 minutes

// Next return next element.
func (s *Scanner) Next() error {
Expand Down Expand Up @@ -229,9 +229,14 @@ func (s *Scanner) getData(bo *Backoffer) error {
if regionErr != nil {
logutil.BgLogger().Debug("scanner getData failed",
zap.Stringer("regionErr", regionErr))
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
// For other region error and the fake region error, backoff because
// there's something wrong.
// For the real EpochNotMatch error, don't backoff.
if regionErr.GetEpochNotMatch() == nil || isFakeRegionError(regionErr) {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
}
}
continue
}
Expand Down
42 changes: 36 additions & 6 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func newTiKVSnapshot(store *KVStore, ts uint64, replicaReadSeed uint32) *KVSnaps
}
}

const batchGetMaxBackoff = 20000
const batchGetMaxBackoff = 600000 // 10 minutes

// SetSnapshotTS resets the timestamp for reads.
func (s *KVSnapshot) SetSnapshotTS(ts uint64) {
Expand Down Expand Up @@ -235,6 +235,19 @@ type batchKeys struct {
keys [][]byte
}

func (b *batchKeys) relocate(bo *Backoffer, c *RegionCache) (bool, error) {
begin, end := b.keys[0], b.keys[len(b.keys)-1]
loc, err := c.LocateKey(bo, begin)
if err != nil {
return false, errors.Trace(err)
}
if !loc.Contains(end) {
return false, nil
}
b.region = loc.Region
return true, nil
}

// appendBatchKeysBySize appends keys to b. It may split the keys to make
// sure each batch's size does not exceed the limit.
func appendBatchKeysBySize(b []batchKeys, region RegionVerID, keys [][]byte, sizeFn func([]byte) int, limit int) []batchKeys {
Expand Down Expand Up @@ -339,10 +352,22 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collec
return errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
// For other region error and the fake region error, backoff because
// there's something wrong.
// For the real EpochNotMatch error, don't backoff.
if regionErr.GetEpochNotMatch() == nil || isFakeRegionError(regionErr) {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
}
}
same, err := batch.relocate(bo, cli.regionCache)
if err != nil {
return errors.Trace(err)
}
if same {
continue
}
err = s.batchGetKeysByRegions(bo, pending, collectF)
return errors.Trace(err)
}
Expand Down Expand Up @@ -402,7 +427,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collec
}
}

const getMaxBackoff = 20000
const getMaxBackoff = 600000 // 10 minutes

// Get gets the value for key k from snapshot.
func (s *KVSnapshot) Get(ctx context.Context, k []byte) ([]byte, error) {
Expand Down Expand Up @@ -497,9 +522,14 @@ func (s *KVSnapshot) get(ctx context.Context, bo *Backoffer, k []byte) ([]byte,
return nil, errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return nil, errors.Trace(err)
// For other region error and the fake region error, backoff because
// there's something wrong.
// For the real EpochNotMatch error, don't backoff.
if regionErr.GetEpochNotMatch() == nil || isFakeRegionError(regionErr) {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return nil, errors.Trace(err)
}
}
continue
}
Expand Down
3 changes: 2 additions & 1 deletion store/tikv/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ func (s StoreProbe) ClearTxnLatches() {
// SendTxnHeartbeat renews a txn's ttl.
func (s StoreProbe) SendTxnHeartbeat(ctx context.Context, key []byte, startTS uint64, ttl uint64) (uint64, error) {
bo := retry.NewBackofferWithVars(ctx, PrewriteMaxBackoff, nil)
return sendTxnHeartBeat(bo, s.KVStore, key, startTS, ttl)
newTTL, _, err := sendTxnHeartBeat(bo, s.KVStore, key, startTS, ttl)
return newTTL, err
}

// LoadSafePoint from safepoint kv.
Expand Down

0 comments on commit 7cc1ebc

Please sign in to comment.