Skip to content

Commit

Permalink
store: check constraint for "Delete-Your-Writes" records when txn com…
Browse files Browse the repository at this point in the history
  • Loading branch information
sre-bot authored and qw4990 committed May 28, 2020
1 parent 21c8f8c commit 0bd7f3f
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 16 deletions.
77 changes: 77 additions & 0 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"errors"
"fmt"
"sync"

. "github.com/pingcap/check"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -2560,6 +2561,56 @@ func (s *testSuite4) TestRebaseIfNeeded(c *C) {
tk.MustQuery(`select a from t where b = 6;`).Check(testkit.Rows("30003"))
}

func (s *testSuite4) TestDeferConstraintCheckForDelete(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("set tidb_constraint_check_in_place = 0")
tk.MustExec("set @@tidb_txn_mode = 'optimistic'")
tk.MustExec("use test")

tk.MustExec("drop table if exists t1, t2, t3, t4, t5")
tk.MustExec("create table t1(i int primary key, j int)")
tk.MustExec("insert into t1 values(1, 2)")
tk.MustExec("begin")
tk.MustExec("insert into t1 values(1, 3)")
tk.MustExec("delete from t1 where j = 3")
_, err := tk.Exec("commit")
c.Assert(err.Error(), Equals, "previous statement: delete from t1 where j = 3: [kv:1062]Duplicate entry '1' for key 'PRIMARY'")
tk.MustExec("rollback")

tk.MustExec("create table t2(i int, j int, unique index idx(i))")
tk.MustExec("insert into t2 values(1, 2)")
tk.MustExec("begin")
tk.MustExec("insert into t2 values(1, 3)")
tk.MustExec("delete from t2 where j = 3")
_, err = tk.Exec("commit")
c.Assert(err.Error(), Equals, "previous statement: delete from t2 where j = 3: [kv:1062]Duplicate entry '1' for key 'idx'")
tk.MustExec("admin check table t2")

tk.MustExec("create table t3(i int, j int, primary key(i))")
tk.MustExec("begin")
tk.MustExec("insert into t3 values(1, 3)")
tk.MustExec("delete from t3 where j = 3")
tk.MustExec("commit")

tk.MustExec("create table t4(i int, j int, primary key(i))")
tk.MustExec("begin")
tk.MustExec("insert into t4 values(1, 3)")
tk.MustExec("delete from t4 where j = 3")
tk.MustExec("insert into t4 values(2, 3)")
tk.MustExec("commit")
tk.MustExec("admin check table t4")
tk.MustQuery("select * from t4").Check(testkit.Rows("2 3"))

tk.MustExec("create table t5(i int, j int, primary key(i))")
tk.MustExec("begin")
tk.MustExec("insert into t5 values(1, 3)")
tk.MustExec("delete from t5 where j = 3")
tk.MustExec("insert into t5 values(1, 4)")
tk.MustExec("commit")
tk.MustExec("admin check table t5")
tk.MustQuery("select * from t5").Check(testkit.Rows("1 4"))
}

func (s *testSuite4) TestDeferConstraintCheckForInsert(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`use test`)
Expand All @@ -2574,6 +2625,32 @@ func (s *testSuite4) TestDeferConstraintCheckForInsert(c *C) {
tk.MustQuery(`select * from t;`).Check(testkit.Rows("2"))
}

func (s *testSuite4) TestPessimisticDeleteYourWrites(c *C) {
session1 := testkit.NewTestKitWithInit(c, s.store)
session2 := testkit.NewTestKitWithInit(c, s.store)

session1.MustExec("drop table if exists x;")
session1.MustExec("create table x (id int primary key, c int);")

session1.MustExec("set tidb_txn_mode = 'pessimistic'")
session2.MustExec("set tidb_txn_mode = 'pessimistic'")

session1.MustExec("begin;")
session1.MustExec("insert into x select 1, 1")
session1.MustExec("delete from x where id = 1")
session2.MustExec("begin;")
var wg sync.WaitGroup
wg.Add(1)
go func() {
session2.MustExec("insert into x select 1, 2")
wg.Done()
}()
session1.MustExec("commit;")
wg.Wait()
session2.MustExec("commit;")
session2.MustQuery("select * from x").Check(testkit.Rows("1 2"))
}

func (s *testSuite4) TestDefEnumInsert(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/pingcap/errors v0.11.4
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20191106014506-c5d88d699a8d
github.com/pingcap/kvproto v0.0.0-20200311073257-e53d835099b0
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd
github.com/pingcap/parser v0.0.0-20191224043251-93f4d5ec2623
github.com/pingcap/pd v1.1.0-beta.0.20191223090411-ea2b748f6ee2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20191106014506-c5d88d699a8d h1:zTHgLr8+0LTEJmjf8yHilgmNhdrVlCN/RW7NeO8IRsE=
github.com/pingcap/kvproto v0.0.0-20191106014506-c5d88d699a8d/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20200311073257-e53d835099b0 h1:dXXNHvDwAEN1YNgv+PSbmAdMqhb0U9Sr2/QZ/wCewSI=
github.com/pingcap/kvproto v0.0.0-20200311073257-e53d835099b0/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/parser v0.0.0-20191224043251-93f4d5ec2623 h1:/BJjVyJlNKWMMrgPsbzk5Y9VPJWwHKYttj3oWxnFQ9U=
Expand Down
6 changes: 5 additions & 1 deletion store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,8 @@ func (mvcc *MVCCLevelDB) Prewrite(req *kvrpcpb.PrewriteRequest) []error {
// If the operation is Insert, check if key is exists at first.
var err error
// no need to check insert values for pessimistic transaction.
if m.GetOp() == kvrpcpb.Op_Insert && forUpdateTS == 0 {
op := m.GetOp()
if (op == kvrpcpb.Op_Insert || op == kvrpcpb.Op_CheckNotExists) && forUpdateTS == 0 {
v, err := mvcc.getValue(m.Key, startTS, kvrpcpb.IsolationLevel_SI)
if err != nil {
errs = append(errs, err)
Expand All @@ -657,6 +658,9 @@ func (mvcc *MVCCLevelDB) Prewrite(req *kvrpcpb.PrewriteRequest) []error {
continue
}
}
if op == kvrpcpb.Op_CheckNotExists {
continue
}
isPessimisticLock := len(req.IsPessimisticLock) > 0 && req.IsPessimisticLock[i]
err = prewriteMutation(mvcc.db, batch, m, startTS, primary, ttl, txnSize, isPessimisticLock)
errs = append(errs, err)
Expand Down
57 changes: 45 additions & 12 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,12 @@ type twoPhaseCommitter struct {
// maxTxnTimeUse represents max time a Txn may use (in ms) from its startTS to commitTS.
// We use it to guarantee GC worker will not influence any active txn. The value
// should be less than GC life time.
maxTxnTimeUse uint64
detail unsafe.Pointer
primaryKey []byte
forUpdateTS uint64
pessimisticTTL uint64
maxTxnTimeUse uint64
detail unsafe.Pointer
primaryKey []byte
forUpdateTS uint64
pessimisticTTL uint64
noNeedCommitKeys map[string]struct{}

mu struct {
sync.RWMutex
Expand Down Expand Up @@ -197,11 +198,12 @@ func sendTxnHeartBeat(bo *Backoffer, store *tikvStore, primary []byte, startTS,

func (c *twoPhaseCommitter) initKeysAndMutations() error {
var (
keys [][]byte
size int
putCnt int
delCnt int
lockCnt int
keys [][]byte
size int
putCnt int
delCnt int
lockCnt int
noNeedCommitKey = make(map[string]struct{})
)
mutations := make(map[string]*mutationEx)
txn := c.txn
Expand Down Expand Up @@ -231,13 +233,24 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
}
putCnt++
} else {
var op pb.Op
if !txn.IsPessimistic() && txn.us.LookupConditionPair(k) != nil {
// delete-your-writes keys in optimistic txn need check not exists in prewrite-phase
// due to `Op_CheckNotExists` doesn't prewrite lock, so mark those keys should not be used in commit-phase.
op = pb.Op_CheckNotExists
noNeedCommitKey[string(k)] = struct{}{}
} else {
// normal delete keys in optimistic txn can be delete without not exists checking
// delete-your-writes keys in pessimistic txn can ensure must be no exists so can directly delete them
op = pb.Op_Del
delCnt++
}
mutations[string(k)] = &mutationEx{
Mutation: pb.Mutation{
Op: pb.Op_Del,
Op: op,
Key: k,
},
}
delCnt++
}
if c.isPessimistic {
if !bytes.Equal(k, c.primaryKey) {
Expand Down Expand Up @@ -314,6 +327,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
zap.Int("puts", putCnt),
zap.Int("dels", delCnt),
zap.Int("locks", lockCnt),
zap.Int("checks", len(noNeedCommitKey)),
zap.Uint64("txnStartTS", txn.startTS))
}

Expand All @@ -334,6 +348,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
metrics.TiKVTxnWriteSizeHistogram.Observe(float64(commitDetail.WriteSize))
c.maxTxnTimeUse = maxTxnTimeUse
c.keys = keys
c.noNeedCommitKeys = noNeedCommitKey
c.mutations = mutations
c.lockTTL = txnLockTTL(txn.startTime, size)
c.priority = getTxnPriority(txn)
Expand Down Expand Up @@ -1147,6 +1162,9 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
return errors.Trace(err)
}

// strip check_not_exists keys that no need to commit.
c.stripNoNeedCommitKeys()

start = time.Now()
commitTS, err := c.store.getTimestampWithRetry(NewBackoffer(ctx, tsoMaxBackoff).WithVars(c.txn.vars))
if err != nil {
Expand Down Expand Up @@ -1212,6 +1230,21 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
return nil
}

func (c *twoPhaseCommitter) stripNoNeedCommitKeys() {
if len(c.noNeedCommitKeys) == 0 {
return
}
var i int
for _, k := range c.keys {
if _, ck := c.noNeedCommitKeys[string(k)]; ck {
continue
}
c.keys[i] = k
i++
}
c.keys = c.keys[:i]
}

type schemaLeaseChecker interface {
Check(txnTS uint64) error
}
Expand Down

0 comments on commit 0bd7f3f

Please sign in to comment.