Skip to content

Commit

Permalink
store/tikv: Add more failpoints about transaction (pingcap#22160)
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
  • Loading branch information
MyonKeminta authored Jan 10, 2021
1 parent 8cc5e17 commit c9ff845
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 25 deletions.
45 changes: 44 additions & 1 deletion store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/hex"
"math"
"math/rand"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -741,6 +742,20 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPh
if actionIsCommit && !actionCommit.retry && !c.isAsyncCommit() {
secondaryBo := NewBackofferWithVars(context.Background(), int(atomic.LoadUint64(&CommitMaxBackoff)), c.txn.vars)
go func() {
if c.connID > 0 {
failpoint.Inject("beforeCommitSecondaries", func(v failpoint.Value) {
if s, ok := v.(string); !ok {
logutil.Logger(bo.ctx).Info("[failpoint] sleep 2s before commit secondary keys",
zap.Uint64("connID", c.connID), zap.Uint64("txnStartTS", c.startTS), zap.Uint64("txnCommitTS", c.commitTS))
time.Sleep(2 * time.Second)
} else if s == "skip" {
logutil.Logger(bo.ctx).Info("[failpoint] injected skip committing secondaries",
zap.Uint64("connID", c.connID), zap.Uint64("txnStartTS", c.startTS), zap.Uint64("txnCommitTS", c.commitTS))
failpoint.Return()
}
})
}

e := c.doActionOnBatches(secondaryBo, action, batchBuilder.allBatches())
if e != nil {
logutil.BgLogger().Debug("2PC async doActionOnBatches",
Expand Down Expand Up @@ -1009,6 +1024,13 @@ func (c *twoPhaseCommitter) checkOnePCFallBack(action twoPhaseCommitAction, batc
func (c *twoPhaseCommitter) cleanup(ctx context.Context) {
c.cleanWg.Add(1)
go func() {
failpoint.Inject("commitFailedSkipCleanup", func() {
logutil.Logger(ctx).Info("[failpoint] injected skip cleanup secondaries on failure",
zap.Uint64("txnStartTS", c.startTS))
c.cleanWg.Done()
failpoint.Return()
})

cleanupKeysCtx := context.WithValue(context.Background(), txnStartKey, ctx.Value(txnStartKey))
err := c.cleanupMutations(NewBackofferWithVars(cleanupKeysCtx, cleanupMaxBackoff, c.txn.vars), c.mutations)
if err != nil {
Expand Down Expand Up @@ -1239,7 +1261,24 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
}

if c.connID > 0 {
failpoint.Inject("beforeCommit", func() {})
failpoint.Inject("beforeCommit", func(val failpoint.Value) {
// Pass multiple instructions in one string, delimited by commas, to trigger multiple behaviors, like
// `return("delay,fail")`. Then they will be executed sequentially at once.
if v, ok := val.(string); ok {
for _, action := range strings.Split(v, ",") {
// Async commit transactions cannot return error here, since it's already successful.
if action == "fail" && !c.isAsyncCommit() {
logutil.Logger(ctx).Info("[failpoint] injected failure before commit", zap.Uint64("txnStartTS", c.startTS))
failpoint.Return(errors.New("injected failure before commit"))
} else if action == "delay" {
duration := time.Duration(rand.Int63n(int64(time.Second) * 5))
logutil.Logger(ctx).Info("[failpoint] injected delay before commit",
zap.Uint64("txnStartTS", c.startTS), zap.Duration("duration", duration))
time.Sleep(duration)
}
}
}
})
}

if c.isAsyncCommit() {
Expand Down Expand Up @@ -1604,6 +1643,10 @@ func newBatched(primaryKey []byte) *batched {
// appendBatchMutationsBySize appends mutations to b. It may split the keys to make
// sure each batch's size does not exceed the limit.
func (b *batched) appendBatchMutationsBySize(region RegionVerID, mutations CommitterMutations, sizeFn func(k, v []byte) int, limit int) {
failpoint.Inject("twoPCRequestBatchSizeLimit", func() {
limit = 1
})

var start, end int
for start = 0; start < mutations.Len(); start = end {
var size int
Expand Down
24 changes: 24 additions & 0 deletions store/tikv/pessimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package tikv

import (
"math/rand"
"strings"
"sync/atomic"
"time"

Expand All @@ -23,7 +25,9 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/logutil"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

type actionPessimisticLock struct {
Expand Down Expand Up @@ -218,6 +222,26 @@ func (actionPessimisticRollback) handleSingleBatch(c *twoPhaseCommitter, bo *Bac
}

func (c *twoPhaseCommitter) pessimisticLockMutations(bo *Backoffer, lockCtx *kv.LockCtx, mutations CommitterMutations) error {
if c.connID > 0 {
failpoint.Inject("beforePessimisticLock", func(val failpoint.Value) {
// Pass multiple instructions in one string, delimited by commas, to trigger multiple behaviors, like
// `return("delay,fail")`. Then they will be executed sequentially at once.
if v, ok := val.(string); ok {
for _, action := range strings.Split(v, ",") {
if action == "delay" {
duration := time.Duration(rand.Int63n(int64(time.Second) * 5))
logutil.Logger(bo.ctx).Info("[failpoint] injected delay at pessimistic lock",
zap.Uint64("txnStartTS", c.startTS), zap.Duration("duration", duration))
time.Sleep(duration)
} else if action == "fail" {
logutil.Logger(bo.ctx).Info("[failpoint] injected failure at pessimistic lock",
zap.Uint64("txnStartTS", c.startTS))
failpoint.Return(errors.New("injected failure at pessimistic lock"))
}
}
}
})
}
return c.doActionOnMutations(bo, actionPessimisticLock{lockCtx}, mutations)
}

Expand Down
27 changes: 26 additions & 1 deletion store/tikv/prewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package tikv

import (
"encoding/hex"
"math"
"sync/atomic"
"time"
Expand Down Expand Up @@ -72,11 +73,25 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u
}
})

ttl := c.lockTTL

if c.connID > 0 {
failpoint.Inject("twoPCShortLockTTL", func() {
ttl = 1
keys := make([]string, 0, len(mutations))
for _, m := range mutations {
keys = append(keys, hex.EncodeToString(m.Key))
}
logutil.BgLogger().Info("[failpoint] injected lock ttl = 1 on prewrite",
zap.Uint64("txnStartTS", c.startTS), zap.Strings("keys", keys))
})
}

req := &pb.PrewriteRequest{
Mutations: mutations,
PrimaryLock: c.primary(),
StartVersion: c.startTS,
LockTtl: c.lockTTL,
LockTtl: ttl,
IsPessimisticLock: isPessimisticLock,
ForUpdateTs: c.forUpdateTS,
TxnSize: txnSize,
Expand Down Expand Up @@ -111,6 +126,16 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff
// regions. It invokes `prewriteMutations` recursively here, and the number of batches will be
// checked there.

if c.connID > 0 {
failpoint.Inject("prewritePrimaryFail", func() {
if batch.isPrimary {
logutil.Logger(bo.ctx).Info("[failpoint] injected error on prewriting primary batch",
zap.Uint64("txnStartTS", c.startTS))
failpoint.Return(errors.New("injected error on prewriting primary batch"))
}
})
}

txnSize := uint64(c.regionTxnSize[batch.region.id])
// When we retry because of a region miss, we don't know the transaction size. We set the transaction size here
// to MaxUint64 to avoid unexpected "resolve lock lite".
Expand Down
74 changes: 51 additions & 23 deletions store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -420,34 +421,61 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, rpcCtx *RPCContext,
ctx, cancel = rawHook.(*RPCCanceller).WithCancel(ctx)
defer cancel()
}
start := time.Now()
resp, err = s.client.SendRequest(ctx, rpcCtx.Addr, req, timeout)
if s.Stats != nil {
recordRegionRequestRuntimeStats(s.Stats, req.Type, time.Since(start))
failpoint.Inject("tikvStoreRespResult", func(val failpoint.Value) {
if val.(bool) {
if req.Type == tikvrpc.CmdCop && bo.totalSleep == 0 {
failpoint.Return(&tikvrpc.Response{
Resp: &coprocessor.Response{RegionError: &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}},
}, false, nil)
}
}

var connID uint64
if v := bo.ctx.Value(sessionctx.ConnID); v != nil {
connID = v.(uint64)
}

injectFailOnSend := false
if connID > 0 {
failpoint.Inject("rpcFailOnSend", func() {
logutil.Logger(ctx).Info("[failpoint] injected RPC error on send", zap.Stringer("type", req.Type),
zap.Stringer("req", req.Req.(fmt.Stringer)), zap.Stringer("ctx", &req.Context))
injectFailOnSend = true
err = errors.New("injected RPC error on send")
})
}

failpoint.Inject("rpcContextCancelErr", func(val failpoint.Value) {
if val.(bool) {
ctx1, cancel := context.WithCancel(context.Background())
cancel()
select {
case <-ctx1.Done():
}
if !injectFailOnSend {
start := time.Now()
resp, err = s.client.SendRequest(ctx, rpcCtx.Addr, req, timeout)
if s.Stats != nil {
recordRegionRequestRuntimeStats(s.Stats, req.Type, time.Since(start))
failpoint.Inject("tikvStoreRespResult", func(val failpoint.Value) {
if val.(bool) {
if req.Type == tikvrpc.CmdCop && bo.totalSleep == 0 {
failpoint.Return(&tikvrpc.Response{
Resp: &coprocessor.Response{RegionError: &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}},
}, false, nil)
}
}
})
}

ctx = ctx1
err = ctx.Err()
resp = nil
if connID > 0 {
failpoint.Inject("rpcFailOnRecv", func() {
logutil.Logger(ctx).Info("[failpoint] injected RPC error on recv", zap.Stringer("type", req.Type),
zap.Stringer("req", req.Req.(fmt.Stringer)), zap.Stringer("ctx", &req.Context))
err = errors.New("injected RPC error on recv")
resp = nil
})
}
})

failpoint.Inject("rpcContextCancelErr", func(val failpoint.Value) {
if val.(bool) {
ctx1, cancel := context.WithCancel(context.Background())
cancel()
select {
case <-ctx1.Done():
}

ctx = ctx1
err = ctx.Err()
resp = nil
}
})
}

if err != nil {
s.rpcError = err
Expand Down

0 comments on commit c9ff845

Please sign in to comment.