From 0bd7f3f34db09c3fc4883360f74ca501805067f1 Mon Sep 17 00:00:00 2001 From: pingcap-github-bot Date: Wed, 11 Mar 2020 21:29:32 +0800 Subject: [PATCH] store: check constraint for "Delete-Your-Writes" records when txn commit (#14968) (#15176) --- executor/write_test.go | 77 ++++++++++++++++++++++++ go.mod | 2 +- go.sum | 4 +- store/mockstore/mocktikv/mvcc_leveldb.go | 6 +- store/tikv/2pc.go | 57 ++++++++++++++---- 5 files changed, 130 insertions(+), 16 deletions(-) diff --git a/executor/write_test.go b/executor/write_test.go index 305188ac17bc2..a1fbb5f8d2f5f 100644 --- a/executor/write_test.go +++ b/executor/write_test.go @@ -17,6 +17,7 @@ import ( "context" "errors" "fmt" + "sync" . "github.com/pingcap/check" "github.com/pingcap/parser/model" @@ -2560,6 +2561,56 @@ func (s *testSuite4) TestRebaseIfNeeded(c *C) { tk.MustQuery(`select a from t where b = 6;`).Check(testkit.Rows("30003")) } +func (s *testSuite4) TestDeferConstraintCheckForDelete(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("set tidb_constraint_check_in_place = 0") + tk.MustExec("set @@tidb_txn_mode = 'optimistic'") + tk.MustExec("use test") + + tk.MustExec("drop table if exists t1, t2, t3, t4, t5") + tk.MustExec("create table t1(i int primary key, j int)") + tk.MustExec("insert into t1 values(1, 2)") + tk.MustExec("begin") + tk.MustExec("insert into t1 values(1, 3)") + tk.MustExec("delete from t1 where j = 3") + _, err := tk.Exec("commit") + c.Assert(err.Error(), Equals, "previous statement: delete from t1 where j = 3: [kv:1062]Duplicate entry '1' for key 'PRIMARY'") + tk.MustExec("rollback") + + tk.MustExec("create table t2(i int, j int, unique index idx(i))") + tk.MustExec("insert into t2 values(1, 2)") + tk.MustExec("begin") + tk.MustExec("insert into t2 values(1, 3)") + tk.MustExec("delete from t2 where j = 3") + _, err = tk.Exec("commit") + c.Assert(err.Error(), Equals, "previous statement: delete from t2 where j = 3: [kv:1062]Duplicate entry '1' for key 'idx'") + tk.MustExec("admin check table t2") + + tk.MustExec("create table t3(i int, j int, primary key(i))") + tk.MustExec("begin") + tk.MustExec("insert into t3 values(1, 3)") + tk.MustExec("delete from t3 where j = 3") + tk.MustExec("commit") + + tk.MustExec("create table t4(i int, j int, primary key(i))") + tk.MustExec("begin") + tk.MustExec("insert into t4 values(1, 3)") + tk.MustExec("delete from t4 where j = 3") + tk.MustExec("insert into t4 values(2, 3)") + tk.MustExec("commit") + tk.MustExec("admin check table t4") + tk.MustQuery("select * from t4").Check(testkit.Rows("2 3")) + + tk.MustExec("create table t5(i int, j int, primary key(i))") + tk.MustExec("begin") + tk.MustExec("insert into t5 values(1, 3)") + tk.MustExec("delete from t5 where j = 3") + tk.MustExec("insert into t5 values(1, 4)") + tk.MustExec("commit") + tk.MustExec("admin check table t5") + tk.MustQuery("select * from t5").Check(testkit.Rows("1 4")) +} + func (s *testSuite4) TestDeferConstraintCheckForInsert(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec(`use test`) @@ -2574,6 +2625,32 @@ func (s *testSuite4) TestDeferConstraintCheckForInsert(c *C) { tk.MustQuery(`select * from t;`).Check(testkit.Rows("2")) } +func (s *testSuite4) TestPessimisticDeleteYourWrites(c *C) { + session1 := testkit.NewTestKitWithInit(c, s.store) + session2 := testkit.NewTestKitWithInit(c, s.store) + + session1.MustExec("drop table if exists x;") + session1.MustExec("create table x (id int primary key, c int);") + + session1.MustExec("set tidb_txn_mode = 'pessimistic'") + session2.MustExec("set tidb_txn_mode = 'pessimistic'") + + session1.MustExec("begin;") + session1.MustExec("insert into x select 1, 1") + session1.MustExec("delete from x where id = 1") + session2.MustExec("begin;") + var wg sync.WaitGroup + wg.Add(1) + go func() { + session2.MustExec("insert into x select 1, 2") + wg.Done() + }() + session1.MustExec("commit;") + wg.Wait() + session2.MustExec("commit;") + session2.MustQuery("select * from x").Check(testkit.Rows("1 2")) +} + func (s *testSuite4) TestDefEnumInsert(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/go.mod b/go.mod index f9097f587793b..5e895f7d4468b 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( github.com/pingcap/errors v0.11.4 github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e - github.com/pingcap/kvproto v0.0.0-20191106014506-c5d88d699a8d + github.com/pingcap/kvproto v0.0.0-20200311073257-e53d835099b0 github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd github.com/pingcap/parser v0.0.0-20191224043251-93f4d5ec2623 github.com/pingcap/pd v1.1.0-beta.0.20191223090411-ea2b748f6ee2 diff --git a/go.sum b/go.sum index d8d50b074a3ec..c024d8da6c4d8 100644 --- a/go.sum +++ b/go.sum @@ -151,8 +151,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= -github.com/pingcap/kvproto v0.0.0-20191106014506-c5d88d699a8d h1:zTHgLr8+0LTEJmjf8yHilgmNhdrVlCN/RW7NeO8IRsE= -github.com/pingcap/kvproto v0.0.0-20191106014506-c5d88d699a8d/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= +github.com/pingcap/kvproto v0.0.0-20200311073257-e53d835099b0 h1:dXXNHvDwAEN1YNgv+PSbmAdMqhb0U9Sr2/QZ/wCewSI= +github.com/pingcap/kvproto v0.0.0-20200311073257-e53d835099b0/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= github.com/pingcap/parser v0.0.0-20191224043251-93f4d5ec2623 h1:/BJjVyJlNKWMMrgPsbzk5Y9VPJWwHKYttj3oWxnFQ9U= diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index 073ae3132f00d..444ee1d3b7cd7 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -641,7 +641,8 @@ func (mvcc *MVCCLevelDB) Prewrite(req *kvrpcpb.PrewriteRequest) []error { // If the operation is Insert, check if key is exists at first. var err error // no need to check insert values for pessimistic transaction. - if m.GetOp() == kvrpcpb.Op_Insert && forUpdateTS == 0 { + op := m.GetOp() + if (op == kvrpcpb.Op_Insert || op == kvrpcpb.Op_CheckNotExists) && forUpdateTS == 0 { v, err := mvcc.getValue(m.Key, startTS, kvrpcpb.IsolationLevel_SI) if err != nil { errs = append(errs, err) @@ -657,6 +658,9 @@ func (mvcc *MVCCLevelDB) Prewrite(req *kvrpcpb.PrewriteRequest) []error { continue } } + if op == kvrpcpb.Op_CheckNotExists { + continue + } isPessimisticLock := len(req.IsPessimisticLock) > 0 && req.IsPessimisticLock[i] err = prewriteMutation(mvcc.db, batch, m, startTS, primary, ttl, txnSize, isPessimisticLock) errs = append(errs, err) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 9e55eacd9ff66..61935d8fe1e66 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -114,11 +114,12 @@ type twoPhaseCommitter struct { // maxTxnTimeUse represents max time a Txn may use (in ms) from its startTS to commitTS. // We use it to guarantee GC worker will not influence any active txn. The value // should be less than GC life time. - maxTxnTimeUse uint64 - detail unsafe.Pointer - primaryKey []byte - forUpdateTS uint64 - pessimisticTTL uint64 + maxTxnTimeUse uint64 + detail unsafe.Pointer + primaryKey []byte + forUpdateTS uint64 + pessimisticTTL uint64 + noNeedCommitKeys map[string]struct{} mu struct { sync.RWMutex @@ -197,11 +198,12 @@ func sendTxnHeartBeat(bo *Backoffer, store *tikvStore, primary []byte, startTS, func (c *twoPhaseCommitter) initKeysAndMutations() error { var ( - keys [][]byte - size int - putCnt int - delCnt int - lockCnt int + keys [][]byte + size int + putCnt int + delCnt int + lockCnt int + noNeedCommitKey = make(map[string]struct{}) ) mutations := make(map[string]*mutationEx) txn := c.txn @@ -231,13 +233,24 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error { } putCnt++ } else { + var op pb.Op + if !txn.IsPessimistic() && txn.us.LookupConditionPair(k) != nil { + // delete-your-writes keys in optimistic txn need check not exists in prewrite-phase + // due to `Op_CheckNotExists` doesn't prewrite lock, so mark those keys should not be used in commit-phase. + op = pb.Op_CheckNotExists + noNeedCommitKey[string(k)] = struct{}{} + } else { + // normal delete keys in optimistic txn can be delete without not exists checking + // delete-your-writes keys in pessimistic txn can ensure must be no exists so can directly delete them + op = pb.Op_Del + delCnt++ + } mutations[string(k)] = &mutationEx{ Mutation: pb.Mutation{ - Op: pb.Op_Del, + Op: op, Key: k, }, } - delCnt++ } if c.isPessimistic { if !bytes.Equal(k, c.primaryKey) { @@ -314,6 +327,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error { zap.Int("puts", putCnt), zap.Int("dels", delCnt), zap.Int("locks", lockCnt), + zap.Int("checks", len(noNeedCommitKey)), zap.Uint64("txnStartTS", txn.startTS)) } @@ -334,6 +348,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error { metrics.TiKVTxnWriteSizeHistogram.Observe(float64(commitDetail.WriteSize)) c.maxTxnTimeUse = maxTxnTimeUse c.keys = keys + c.noNeedCommitKeys = noNeedCommitKey c.mutations = mutations c.lockTTL = txnLockTTL(txn.startTime, size) c.priority = getTxnPriority(txn) @@ -1147,6 +1162,9 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { return errors.Trace(err) } + // strip check_not_exists keys that no need to commit. + c.stripNoNeedCommitKeys() + start = time.Now() commitTS, err := c.store.getTimestampWithRetry(NewBackoffer(ctx, tsoMaxBackoff).WithVars(c.txn.vars)) if err != nil { @@ -1212,6 +1230,21 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { return nil } +func (c *twoPhaseCommitter) stripNoNeedCommitKeys() { + if len(c.noNeedCommitKeys) == 0 { + return + } + var i int + for _, k := range c.keys { + if _, ck := c.noNeedCommitKeys[string(k)]; ck { + continue + } + c.keys[i] = k + i++ + } + c.keys = c.keys[:i] +} + type schemaLeaseChecker interface { Check(txnTS uint64) error }