Skip to content

Commit

Permalink
store/tikv: define KVFilter for customized mutation initialization (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sticnarf authored Apr 15, 2021
1 parent 2852b31 commit c46cc91
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 1 deletion.
9 changes: 9 additions & 0 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type tikvTxn struct {

// NewTiKVTxn returns a new Transaction.
func NewTiKVTxn(txn *tikv.KVTxn) kv.Transaction {
txn.SetOption(tikvstore.KVFilter, TiDBKVFilter{})
return &tikvTxn{txn, make(map[int64]*model.TableInfo)}
}

Expand Down Expand Up @@ -141,3 +142,11 @@ func (txn *tikvTxn) extractKeyExistsErr(key kv.Key) error {
}
return extractKeyExistsErrFromIndex(key, value, tblInfo, indexID)
}

// TiDBKVFilter is the filter specific to TiDB to filter out KV pairs that needn't be committed.
type TiDBKVFilter struct{}

// IsUnnecessaryKeyValue defines which kinds of KV pairs from TiDB needn't be committed.
func (f TiDBKVFilter) IsUnnecessaryKeyValue(key, value []byte, flags tikvstore.KeyFlags) bool {
return tablecodec.IsUntouchedIndexKValue(key, value)
}
13 changes: 12 additions & 1 deletion store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,12 @@ func (c *twoPhaseCommitter) extractKeyExistsErr(err *kv.ErrKeyExist) error {
return errors.Trace(err)
}

// KVFilter is a filter that filters out unnecessary KV pairs.
type KVFilter interface {
// IsUnnecessaryKeyValue returns whether this KV pair should be committed.
IsUnnecessaryKeyValue(key, value []byte, flags kv.KeyFlags) bool
}

func (c *twoPhaseCommitter) initKeysAndMutations() error {
var size, putCnt, delCnt, lockCnt, checkCnt int

Expand All @@ -322,6 +328,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
sizeHint := txn.us.GetMemBuffer().Len()
c.mutations = newMemBufferMutations(sizeHint, memBuf)
c.isPessimistic = txn.IsPessimistic()
filter := txn.getKVFilter()

var err error
for it := memBuf.IterWithFlags(nil, nil); it.Valid(); err = it.Next() {
Expand All @@ -340,10 +347,14 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
} else {
value = it.Value()
if len(value) > 0 {
if tablecodec.IsUntouchedIndexKValue(key, value) {
isUnnecessaryKV := filter != nil && filter.IsUnnecessaryKeyValue(key, value, flags)
if isUnnecessaryKV {
if !flags.HasLocked() {
continue
}
// If the key was locked before, we should prewrite the lock even if
// the KV needn't be committed according to the filter. Otherwise, we
// were forgetting removing pessimistic locks added before.
op = pb.Op_Lock
lockCnt++
} else {
Expand Down
2 changes: 2 additions & 0 deletions store/tikv/kv/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ const (
IsStalenessReadOnly
// MatchStoreLabels indicates the labels the store should be matched
MatchStoreLabels
// KVFilter filters out the key-value pairs in the memBuf that is unnecessary to be committed
KVFilter
)

// Priority value for transaction priority.
Expand Down
2 changes: 2 additions & 0 deletions store/tikv/tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
tidbkv "github.com/pingcap/tidb/kv"
drivertxn "github.com/pingcap/tidb/store/driver/txn"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/config"
Expand Down Expand Up @@ -1032,6 +1033,7 @@ func (s *testCommitterSuite) TestResolvePessimisticLock(c *C) {
noValueIndexKey := []byte("t00000001_i000000002")
c.Assert(tablecodec.IsUntouchedIndexKValue(untouchedIndexKey, untouchedIndexValue), IsTrue)
txn := s.begin(c)
txn.SetOption(kv.KVFilter, drivertxn.TiDBKVFilter{})
err := txn.Set(untouchedIndexKey, untouchedIndexValue)
c.Assert(err, IsNil)
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now(), LockWaitTime: tidbkv.LockNoWait}
Expand Down
7 changes: 7 additions & 0 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,13 @@ func (txn *KVTxn) IsPessimistic() bool {
return txn.us.GetOption(kv.Pessimistic) != nil
}

func (txn *KVTxn) getKVFilter() KVFilter {
if filter := txn.us.GetOption(kv.KVFilter); filter != nil {
return filter.(KVFilter)
}
return nil
}

// Commit commits the transaction operations to KV store.
func (txn *KVTxn) Commit(ctx context.Context) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
Expand Down

0 comments on commit c46cc91

Please sign in to comment.