Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: locks key in point get executor for pessimistic transaction #10972

Merged
merged 2 commits into from
Jun 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/explaintest/r/explain_easy.result
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,9 @@ id count task operator info
Projection_3 10000.00 root or(NULL, gt(test.t.a, 1))
└─TableReader_5 10000.00 root data:TableScan_4
└─TableScan_4 10000.00 cop table:t, range:[-inf,+inf], keep order:false, stats:pseudo
explain select * from t where a = 1 for update;
id count task operator info
Point_Get_1 1.00 root table:t, handle:1
drop table if exists ta, tb;
create table ta (a varchar(20));
create table tb (a varchar(20));
Expand Down
1 change: 1 addition & 0 deletions cmd/explaintest/t/explain_easy.test
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ drop table if exists t;
create table t(a bigint primary key);
explain select * from t where a = 1 and a = 2;
explain select null or a > 1 from t;
explain select * from t where a = 1 for update;

drop table if exists ta, tb;
create table ta (a varchar(20));
Expand Down
16 changes: 9 additions & 7 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,12 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
for {
_, err = a.handleNoDelayExecutor(ctx, e)
if err != nil {
return err
// It is possible the DML has point get plan that locks the key.
e, err = a.handlePessimisticLockError(ctx, err)
if err != nil {
return err
}
continue
}
keys, err1 := txn.(pessimisticTxn).KeysNeedToLock()
if err1 != nil {
Expand All @@ -422,21 +427,18 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
}
forUpdateTS := txnCtx.GetForUpdateTS()
err = txn.LockKeys(ctx, forUpdateTS, keys...)
if err == nil {
return nil
}
e, err = a.handlePessimisticLockError(ctx, err)
if err != nil {
return err
}
if e == nil {
return nil
}
}
}

// handlePessimisticLockError updates TS and rebuild executor if the err is write conflict.
func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (Executor, error) {
if err == nil {
return nil, nil
}
txnCtx := a.Ctx.GetSessionVars().TxnCtx
var newForUpdateTS uint64
if deadlock, ok := errors.Cause(err).(*tikv.ErrDeadlock); ok {
Expand Down
20 changes: 19 additions & 1 deletion executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor {
idxVals: p.IndexValues,
handle: p.Handle,
startTS: startTS,
lock: p.Lock,
}
b.isSelectForUpdate = p.IsForUpdate
e.base().initCap = 1
e.base().maxChunkSize = 1
return e
Expand All @@ -60,6 +62,7 @@ type PointGetExecutor struct {
startTS uint64
snapshot kv.Snapshot
done bool
lock bool
}

// Open implements the Executor interface.
Expand Down Expand Up @@ -95,7 +98,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
return err1
}
if len(handleVal) == 0 {
return nil
return e.lockKeyIfNeeded(ctx, idxKey)
}
e.handle, err1 = tables.DecodeHandle(handleVal)
if err1 != nil {
Expand All @@ -122,6 +125,10 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if err != nil && !kv.ErrNotExist.Equal(err) {
return err
}
err = e.lockKeyIfNeeded(ctx, key)
if err != nil {
return err
}
if len(val) == 0 {
if e.idxInfo != nil {
return kv.ErrNotExist.GenWithStack("inconsistent extra index %s, handle %d not found in table",
Expand All @@ -132,6 +139,17 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
return e.decodeRowValToChunk(val, req)
}

func (e *PointGetExecutor) lockKeyIfNeeded(ctx context.Context, key []byte) error {
if e.lock {
txn, err := e.ctx.Txn(true)
if err != nil {
return err
}
return txn.LockKeys(ctx, e.ctx.GetSessionVars().TxnCtx.GetForUpdateTS(), kv.Key(key))
}
return nil
}

func (e *PointGetExecutor) encodeIndexKey() (_ []byte, err error) {
sc := e.ctx.GetSessionVars().StmtCtx
for i := range e.idxVals {
Expand Down
14 changes: 13 additions & 1 deletion planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type PointGetPlan struct {
expr expression.Expression
ctx sessionctx.Context
IsTableDual bool
Lock bool
IsForUpdate bool
}

type nameValuePair struct {
Expand Down Expand Up @@ -141,6 +143,10 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) Plan {
tableDual.SetSchema(fp.Schema())
return tableDual.Init(ctx, &property.StatsInfo{})
}
if x.LockTp == ast.SelectLockForUpdate {
fp.Lock = true
fp.IsForUpdate = true
}
return fp
}
case *ast.UpdateStmt:
Expand All @@ -159,7 +165,7 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) Plan {
// 3. All the columns must be public and generated.
// 4. The condition is an access path that the range is a unique key.
func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt) *PointGetPlan {
if selStmt.Having != nil || selStmt.LockTp != ast.SelectLockNone {
if selStmt.Having != nil {
return nil
} else if selStmt.Limit != nil {
count, offset, err := extractLimitCountOffset(ctx, selStmt.Limit)
Expand Down Expand Up @@ -452,6 +458,9 @@ func tryUpdatePointPlan(ctx sessionctx.Context, updateStmt *ast.UpdateStmt) Plan
if fastSelect.IsTableDual {
return PhysicalTableDual{}.Init(ctx, &property.StatsInfo{})
}
if ctx.GetSessionVars().TxnCtx.IsPessimistic {
fastSelect.Lock = true
}
orderedList := buildOrderedList(ctx, fastSelect, updateStmt.List)
if orderedList == nil {
return nil
Expand Down Expand Up @@ -512,6 +521,9 @@ func tryDeletePointPlan(ctx sessionctx.Context, delStmt *ast.DeleteStmt) Plan {
if fastSelect.IsTableDual {
return PhysicalTableDual{}.Init(ctx, &property.StatsInfo{})
}
if ctx.GetSessionVars().TxnCtx.IsPessimistic {
fastSelect.Lock = true
}
delPlan := Delete{
SelectPlan: fastSelect,
}.Init(ctx)
Expand Down
44 changes: 44 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,47 @@ func (s *testPessimisticSuite) TestInsertOnDup(c *C) {
tk.MustExec("commit")
tk.MustQuery("select * from dup").Check(testkit.Rows("1 2"))
}

func (s *testPessimisticSuite) TestPointGetKeyLock(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists point")
tk.MustExec("create table point (id int primary key, u int unique, c int)")
syncCh := make(chan struct{})

tk.MustExec("begin pessimistic")
tk.MustExec("update point set c = c + 1 where id = 1")
tk.MustExec("delete from point where u = 2")
go func() {
tk2.MustExec("begin pessimistic")
_, err1 := tk2.Exec("insert point values (1, 1, 1)")
c.Check(kv.ErrKeyExists.Equal(err1), IsTrue)
_, err1 = tk2.Exec("insert point values (2, 2, 2)")
c.Check(kv.ErrKeyExists.Equal(err1), IsTrue)
tk2.MustExec("rollback")
<-syncCh
}()
time.Sleep(time.Millisecond * 10)
tk.MustExec("insert point values (1, 1, 1)")
tk.MustExec("insert point values (2, 2, 2)")
tk.MustExec("commit")
syncCh <- struct{}{}

tk.MustExec("begin pessimistic")
tk.MustExec("select * from point where id = 3 for update")
tk.MustExec("select * from point where u = 4 for update")
go func() {
tk2.MustExec("begin pessimistic")
_, err1 := tk2.Exec("insert point values (3, 3, 3)")
c.Check(kv.ErrKeyExists.Equal(err1), IsTrue)
_, err1 = tk2.Exec("insert point values (4, 4, 4)")
c.Check(kv.ErrKeyExists.Equal(err1), IsTrue)
tk2.MustExec("rollback")
<-syncCh
}()
time.Sleep(time.Millisecond * 10)
tk.MustExec("insert point values (3, 3, 3)")
tk.MustExec("insert point values (4, 4, 4)")
tk.MustExec("commit")
syncCh <- struct{}{}
}