Skip to content

Commit

Permalink
store/tikv: tiny refactoring, change twoPhaseCommitAction to interf…
Browse files Browse the repository at this point in the history
…ace (pingcap#12845)
  • Loading branch information
tiancaiamao authored and sre-bot committed Oct 21, 2019
1 parent 50c60ae commit 16f3e6a
Showing 1 changed file with 61 additions and 71 deletions.
132 changes: 61 additions & 71 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,23 @@ import (
"go.uber.org/zap"
)

type twoPhaseCommitAction int
type twoPhaseCommitAction interface {
handleSingleBatch(*twoPhaseCommitter, *Backoffer, batchKeys) error
String() string
}

const (
actionPrewrite twoPhaseCommitAction = 1 + iota
actionCommit
actionCleanup
actionPessimisticLock
actionPessimisticRollback
type actionPrewrite struct{}
type actionCommit struct{}
type actionCleanup struct{}
type actionPessimisticLock struct{}
type actionPessimisticRollback struct{}

var (
_ twoPhaseCommitAction = actionPrewrite{}
_ twoPhaseCommitAction = actionCommit{}
_ twoPhaseCommitAction = actionCleanup{}
_ twoPhaseCommitAction = actionPessimisticLock{}
_ twoPhaseCommitAction = actionPessimisticRollback{}
)

var (
Expand All @@ -60,24 +69,28 @@ var (
PessimisticLockTTL uint64 = 15000 // 15s ~ 40s
)

func (ca twoPhaseCommitAction) String() string {
switch ca {
case actionPrewrite:
return "prewrite"
case actionCommit:
return "commit"
case actionCleanup:
return "cleanup"
case actionPessimisticLock:
return "pessimistic_lock"
case actionPessimisticRollback:
return "pessimistic_rollback"
}
return "unknown"
func (actionPrewrite) String() string {
return "prewrite"
}

func (actionCommit) String() string {
return "commit"
}

func (actionCleanup) String() string {
return "cleanup"
}

func (actionPessimisticLock) String() string {
return "pessimistic_lock"
}

// MetricsTag returns detail tag for metrics.
func (ca twoPhaseCommitAction) MetricsTag() string {
func (actionPessimisticRollback) String() string {
return "pessimistic_rollback"
}

// metricsTag returns detail tag for metrics.
func metricsTag(ca twoPhaseCommitAction) string {
return "2pc_" + ca.String()
}

Expand Down Expand Up @@ -120,13 +133,10 @@ type batchExecutor struct {
rateLimiter *rateLimit // rate limiter for concurrency control, maybe more strategies
committer *twoPhaseCommitter // here maybe more different type committer in the future
action twoPhaseCommitAction // the work action type
procFn procOneBatchFn // injected proc batch function
backoffer *Backoffer // Backoffer
tokenWaitDuration time.Duration // get token wait time
}

type procOneBatchFn func(bo *Backoffer, batch batchKeys) error

type mutationEx struct {
pb.Mutation
asserted bool
Expand Down Expand Up @@ -374,11 +384,11 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA
return errors.Trace(err)
}

metrics.TiKVTxnRegionsNumHistogram.WithLabelValues(action.MetricsTag()).Observe(float64(len(groups)))
metrics.TiKVTxnRegionsNumHistogram.WithLabelValues(metricsTag(action)).Observe(float64(len(groups)))

var batches []batchKeys
var sizeFunc = c.keySize
if action == actionPrewrite {
if _, ok := action.(actionPrewrite); ok {
// Do not update regionTxnSize on retries. They are not used when building a PrewriteRequest.
if len(bo.errors) == 0 {
for region, keys := range groups {
Expand All @@ -396,15 +406,17 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA
}

firstIsPrimary := bytes.Equal(keys[0], c.primary())
if firstIsPrimary && (action == actionCommit || action == actionCleanup) {
_, actionIsCommit := action.(actionCommit)
_, actionIsCleanup := action.(actionCleanup)
if firstIsPrimary && (actionIsCommit || actionIsCleanup) {
// primary should be committed/cleanup first
err = c.doActionOnBatches(bo, action, batches[:1])
if err != nil {
return errors.Trace(err)
}
batches = batches[1:]
}
if action == actionCommit {
if actionIsCommit {
// Commit secondary batches in background goroutine to reduce latency.
// The backoffer instance is created outside of the goroutine to avoid
// potential data race in unit test since `CommitMaxBackoff` will be updated
Expand All @@ -428,15 +440,12 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA

// doActionOnBatches does action to batches in parallel.
func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseCommitAction, batches []batchKeys) error {
singleBatchActionFunc, err := c.getProcFuncByType(action)
if err != nil {
return err
}
if len(batches) == 0 {
return nil
}

if len(batches) == 1 {
e := singleBatchActionFunc(bo, batches[0])
e := action.handleSingleBatch(c, bo, batches[0])
if e != nil {
logutil.BgLogger().Debug("2PC doActionOnBatches failed",
zap.Uint64("conn", c.connID),
Expand All @@ -447,30 +456,11 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
return errors.Trace(e)
}
rateLim := len(batches) // this will be used for LargeTxn, set rateLim here
batchExecutor := newBatchExecutor(rateLim, c, action, singleBatchActionFunc, bo)
err = batchExecutor.process(batches)
batchExecutor := newBatchExecutor(rateLim, c, action, bo)
err := batchExecutor.process(batches)
return errors.Trace(err)
}

func (c *twoPhaseCommitter) getProcFuncByType(action twoPhaseCommitAction) (procOneBatchFn, error) {
var singleBatchActionFunc procOneBatchFn
switch action {
case actionPrewrite:
singleBatchActionFunc = c.prewriteSingleBatch
case actionCommit:
singleBatchActionFunc = c.commitSingleBatch
case actionCleanup:
singleBatchActionFunc = c.cleanupSingleBatch
case actionPessimisticLock:
singleBatchActionFunc = c.pessimisticLockSingleBatch
case actionPessimisticRollback:
singleBatchActionFunc = c.pessimisticRollbackSingleBatch
default:
return nil, errors.Errorf("invalid action type=%v", action)
}
return singleBatchActionFunc, nil
}

func (c *twoPhaseCommitter) keyValueSize(key []byte) int {
size := len(key)
if mutation := c.mutations[string(key)]; mutation != nil {
Expand Down Expand Up @@ -508,7 +498,7 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchKeys, txnSize uint64
return tikvrpc.NewRequest(tikvrpc.CmdPrewrite, req, pb.Context{Priority: c.priority, SyncLog: c.syncLog})
}

func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) error {
func (actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error {
txnSize := uint64(c.regionTxnSize[batch.region.id])
// When we retry because of a region miss, we don't know the transaction size. We set the transaction size here
// to MaxUint64 to avoid unexpected "resolve lock lite".
Expand Down Expand Up @@ -653,7 +643,7 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) {
}
}

func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batchKeys) error {
func (actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error {
mutations := make([]*pb.Mutation, len(batch.keys))
for i, k := range batch.keys {
mut := &pb.Mutation{
Expand Down Expand Up @@ -730,7 +720,7 @@ func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batc
}
}

func (c *twoPhaseCommitter) pessimisticRollbackSingleBatch(bo *Backoffer, batch batchKeys) error {
func (actionPessimisticRollback) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error {
req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticRollback, &pb.PessimisticRollbackRequest{
StartVersion: c.startTS,
ForUpdateTs: c.forUpdateTS,
Expand Down Expand Up @@ -801,7 +791,7 @@ func (c *twoPhaseCommitter) getUndeterminedErr() error {
return c.mu.undeterminedErr
}

func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) error {
func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error {
req := tikvrpc.NewRequest(tikvrpc.CmdCommit, &pb.CommitRequest{
StartVersion: c.startTS,
Keys: batch.keys,
Expand Down Expand Up @@ -873,7 +863,7 @@ func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) er
return nil
}

func (c *twoPhaseCommitter) cleanupSingleBatch(bo *Backoffer, batch batchKeys) error {
func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error {
req := tikvrpc.NewRequest(tikvrpc.CmdBatchRollback, &pb.BatchRollbackRequest{
Keys: batch.keys,
StartVersion: c.startTS,
Expand Down Expand Up @@ -911,7 +901,7 @@ func (c *twoPhaseCommitter) prewriteKeys(bo *Backoffer, keys [][]byte) error {
bo.ctx = opentracing.ContextWithSpan(bo.ctx, span1)
}

return c.doActionOnKeys(bo, actionPrewrite, keys)
return c.doActionOnKeys(bo, actionPrewrite{}, keys)
}

func (c *twoPhaseCommitter) commitKeys(bo *Backoffer, keys [][]byte) error {
Expand All @@ -921,19 +911,19 @@ func (c *twoPhaseCommitter) commitKeys(bo *Backoffer, keys [][]byte) error {
bo.ctx = opentracing.ContextWithSpan(bo.ctx, span1)
}

return c.doActionOnKeys(bo, actionCommit, keys)
return c.doActionOnKeys(bo, actionCommit{}, keys)
}

func (c *twoPhaseCommitter) cleanupKeys(bo *Backoffer, keys [][]byte) error {
return c.doActionOnKeys(bo, actionCleanup, keys)
return c.doActionOnKeys(bo, actionCleanup{}, keys)
}

func (c *twoPhaseCommitter) pessimisticLockKeys(bo *Backoffer, keys [][]byte) error {
return c.doActionOnKeys(bo, actionPessimisticLock, keys)
return c.doActionOnKeys(bo, actionPessimisticLock{}, keys)
}

func (c *twoPhaseCommitter) pessimisticRollbackKeys(bo *Backoffer, keys [][]byte) error {
return c.doActionOnKeys(bo, actionPessimisticRollback, keys)
return c.doActionOnKeys(bo, actionPessimisticRollback{}, keys)
}

func (c *twoPhaseCommitter) executeAndWriteFinishBinlog(ctx context.Context) error {
Expand Down Expand Up @@ -1148,9 +1138,9 @@ func appendBatchBySize(b []batchKeys, region RegionVerID, keys [][]byte, sizeFn

// newBatchExecutor create processor to handle concurrent batch works(prewrite/commit etc)
func newBatchExecutor(rateLimit int, committer *twoPhaseCommitter,
action twoPhaseCommitAction, procFn procOneBatchFn, backoffer *Backoffer) *batchExecutor {
action twoPhaseCommitAction, backoffer *Backoffer) *batchExecutor {
return &batchExecutor{rateLimit, nil, committer,
action, procFn, backoffer, time.Duration(1 * time.Millisecond)}
action, backoffer, time.Duration(1 * time.Millisecond)}
}

// initUtils do initialize batchExecutor related policies like rateLimit util
Expand All @@ -1170,7 +1160,7 @@ func (batchExe *batchExecutor) startWorker(exitCh chan struct{}, ch chan error,
go func() {
defer batchExe.rateLimiter.putToken()
var singleBatchBackoffer *Backoffer
if batchExe.action == actionCommit {
if _, ok := batchExe.action.(actionCommit); ok {
// Because the secondary batches of the commit actions are implemented to be
// committed asynchronously in background goroutines, we should not
// fork a child context and call cancel() while the foreground goroutine exits.
Expand All @@ -1185,7 +1175,7 @@ func (batchExe *batchExecutor) startWorker(exitCh chan struct{}, ch chan error,
defer singleBatchCancel()
}
beforeSleep := singleBatchBackoffer.totalSleep
ch <- batchExe.procFn(singleBatchBackoffer, batch)
ch <- batchExe.action.handleSingleBatch(batchExe.committer, singleBatchBackoffer, batch)
commitDetail := batchExe.committer.getDetail()
if commitDetail != nil { // lock operations of pessimistic-txn will let commitDetail be nil
if delta := singleBatchBackoffer.totalSleep - beforeSleep; delta > 0 {
Expand Down Expand Up @@ -1217,7 +1207,7 @@ func (batchExe *batchExecutor) process(batches []batchKeys) error {
// For prewrite, stop sending other requests after receiving first error.
backoffer := batchExe.backoffer
var cancel context.CancelFunc
if batchExe.action == actionPrewrite {
if _, ok := batchExe.action.(actionPrewrite); ok {
backoffer, cancel = batchExe.backoffer.Fork()
defer cancel()
}
Expand Down

0 comments on commit 16f3e6a

Please sign in to comment.