Skip to content

Commit

Permalink
*: fix bug that the kill command doesn't work when the killed session…
Browse files Browse the repository at this point in the history
… is waiting for the pessimistic lock (pingcap#12852)
  • Loading branch information
tiancaiamao authored and XiaTianliang committed Dec 21, 2019
1 parent d7414ed commit d9073f5
Show file tree
Hide file tree
Showing 13 changed files with 75 additions and 22 deletions.
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (taskCtx

// Lock the row key to notify us that someone delete or update the row,
// then we should not backfill the index of it, otherwise the adding index is redundant.
err := txn.LockKeys(context.Background(), 0, idxRecord.key)
err := txn.LockKeys(context.Background(), nil, 0, idxRecord.key)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
return nil
}
forUpdateTS := txnCtx.GetForUpdateTS()
err = txn.LockKeys(ctx, forUpdateTS, keys...)
err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, keys...)
if err == nil {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa
}

recordKey := e.table.RecordKey(row.handle)
err := txn.LockKeys(ctx, 0, recordKey)
err := txn.LockKeys(ctx, nil, 0, recordKey)
if err != nil {
return result, err
}
Expand Down
2 changes: 1 addition & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ func doLockKeys(ctx context.Context, se sessionctx.Context, keys ...kv.Key) erro
return err
}
forUpdateTS := se.GetSessionVars().TxnCtx.GetForUpdateTS()
return txn.LockKeys(ctx, forUpdateTS, keys...)
return txn.LockKeys(ctx, &se.GetSessionVars().Killed, forUpdateTS, keys...)
}

// LimitExec represents limit executor
Expand Down
2 changes: 1 addition & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ type Transaction interface {
// String implements fmt.Stringer interface.
String() string
// LockKeys tries to lock the entries with the keys in KV store.
LockKeys(ctx context.Context, forUpdateTS uint64, keys ...Key) error
LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, keys ...Key) error
// SetOption sets an option with a value, when val is nil, uses the default
// value of this option.
SetOption(opt Option, val interface{})
Expand Down
2 changes: 1 addition & 1 deletion kv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (t *mockTxn) String() string {
return ""
}

func (t *mockTxn) LockKeys(_ context.Context, _ uint64, _ ...Key) error {
func (t *mockTxn) LockKeys(_ context.Context, _ *uint32, _ uint64, _ ...Key) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion kv/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (s testMockSuite) TestInterface(c *C) {

transaction, err := storage.Begin()
c.Check(err, IsNil)
err = transaction.LockKeys(context.Background(), 0, Key("lock"))
err = transaction.LockKeys(context.Background(), nil, 0, Key("lock"))
c.Check(err, IsNil)
transaction.SetOption(Option(23), struct{}{})
if mock, ok := transaction.(*mockTxn); ok {
Expand Down
29 changes: 29 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package session_test

import (
"fmt"
"sync"
"sync/atomic"
"time"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -392,3 +394,30 @@ func (s *testPessimisticSuite) TestOptimisticConflicts(c *C) {
tk.MustExec("commit")
tk.MustQuery("select * from conflict").Check(testkit.Rows("1 3"))
}

func (s *testPessimisticSuite) TestWaitLockKill(c *C) {
// Test kill command works on waiting pessimistic lock.
tk := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists test_kill")
tk.MustExec("create table test_kill (id int primary key, c int)")
tk.MustExec("insert test_kill values (1, 1)")
tk.MustExec("begin pessimistic")
tk2.MustExec("begin pessimistic")
tk.MustQuery("select * from test_kill where id = 1 for update")

var wg sync.WaitGroup
wg.Add(1)
go func() {
time.Sleep(500 * time.Millisecond)
sessVars := tk2.Se.GetSessionVars()
succ := atomic.CompareAndSwapUint32(&sessVars.Killed, 0, 1)
c.Assert(succ, IsTrue)
wg.Wait()
}()
_, err := tk2.Exec("update test_kill set c = c + 1 where id = 1")
wg.Done()
c.Assert(err, NotNil)
c.Assert(terror.ErrorEqual(err, tikv.ErrQueryInterrupted), IsTrue)
tk.MustExec("rollback")
}
29 changes: 22 additions & 7 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type twoPhaseCommitAction interface {
type actionPrewrite struct{}
type actionCommit struct{}
type actionCleanup struct{}
type actionPessimisticLock struct{}
type actionPessimisticLock struct{ killed *uint32 }
type actionPessimisticRollback struct{}

var (
Expand Down Expand Up @@ -643,7 +643,7 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) {
}
}

func (actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error {
func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error {
mutations := make([]*pb.Mutation, len(batch.keys))
for i, k := range batch.keys {
mut := &pb.Mutation{
Expand Down Expand Up @@ -679,7 +679,7 @@ func (actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff
if err != nil {
return errors.Trace(err)
}
err = c.pessimisticLockKeys(bo, batch.keys)
err = c.pessimisticLockKeys(bo, action.killed, batch.keys)
return errors.Trace(err)
}
if resp.Resp == nil {
Expand Down Expand Up @@ -712,11 +712,26 @@ func (actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff
}
locks = append(locks, lock)
}
_, err = c.store.lockResolver.ResolveLocks(bo, c.startTS, locks)
var expire int64
expire, err = c.store.lockResolver.ResolveLocks(bo, c.startTS, locks)
if err != nil {
return errors.Trace(err)
}
// Because we already waited on tikv, no need to Backoff here.

if err1 := bo.BackoffWithMaxSleep(BoTxnLock, int(expire), errors.New(locks[0].String())); err1 != nil {
return err1
}
// Handle the killed flag when waiting for the pessimistic lock.
// When a txn runs into LockKeys() and backoff here, it has no chance to call
// executor.Next() and check the killed flag.
if action.killed != nil {
// Do not reset the killed flag here!
// actionPessimisticLock runs on each region parallelly, we have to consider that
// the error may be dropped.
if atomic.LoadUint32(action.killed) == 1 {
return ErrQueryInterrupted
}
}
}
}

Expand Down Expand Up @@ -918,8 +933,8 @@ func (c *twoPhaseCommitter) cleanupKeys(bo *Backoffer, keys [][]byte) error {
return c.doActionOnKeys(bo, actionCleanup{}, keys)
}

func (c *twoPhaseCommitter) pessimisticLockKeys(bo *Backoffer, keys [][]byte) error {
return c.doActionOnKeys(bo, actionPessimisticLock{}, keys)
func (c *twoPhaseCommitter) pessimisticLockKeys(bo *Backoffer, killed *uint32, keys [][]byte) error {
return c.doActionOnKeys(bo, actionPessimisticLock{killed}, keys)
}

func (c *twoPhaseCommitter) pessimisticRollbackKeys(bo *Backoffer, keys [][]byte) error {
Expand Down
10 changes: 5 additions & 5 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func (s *testCommitterSuite) TestUnsetPrimaryKey(c *C) {
c.Assert(txn.Set(key, key), IsNil)
txn.DelOption(kv.PresumeKeyNotExistsError)
txn.DelOption(kv.PresumeKeyNotExists)
err := txn.LockKeys(context.Background(), txn.startTS, key)
err := txn.LockKeys(context.Background(), nil, txn.startTS, key)
c.Assert(err, NotNil)
c.Assert(txn.Delete(key), IsNil)
key2 := kv.Key("key2")
Expand All @@ -518,9 +518,9 @@ func (s *testCommitterSuite) TestUnsetPrimaryKey(c *C) {
func (s *testCommitterSuite) TestPessimisticLockedKeysDedup(c *C) {
txn := s.begin(c)
txn.SetOption(kv.Pessimistic, true)
err := txn.LockKeys(context.Background(), 100, kv.Key("abc"), kv.Key("def"))
err := txn.LockKeys(context.Background(), nil, 100, kv.Key("abc"), kv.Key("def"))
c.Assert(err, IsNil)
err = txn.LockKeys(context.Background(), 100, kv.Key("abc"), kv.Key("def"))
err = txn.LockKeys(context.Background(), nil, 100, kv.Key("abc"), kv.Key("def"))
c.Assert(err, IsNil)
c.Assert(txn.lockKeys, HasLen, 2)
}
Expand All @@ -530,11 +530,11 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) {
txn := s.begin(c)
txn.SetOption(kv.Pessimistic, true)
time.Sleep(time.Millisecond * 100)
err := txn.LockKeys(context.Background(), txn.startTS, key)
err := txn.LockKeys(context.Background(), nil, txn.startTS, key)
c.Assert(err, IsNil)
time.Sleep(time.Millisecond * 100)
key2 := kv.Key("key2")
err = txn.LockKeys(context.Background(), txn.startTS, key2)
err = txn.LockKeys(context.Background(), nil, txn.startTS, key2)
c.Assert(err, IsNil)
lockInfo := s.getLockInfo(c, key)
elapsedTTL := lockInfo.LockTtl - PessimisticLockTTL
Expand Down
2 changes: 2 additions & 0 deletions store/tikv/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var (
ErrRegionUnavailable = terror.ClassTiKV.New(mysql.ErrRegionUnavailable, mysql.MySQLErrName[mysql.ErrRegionUnavailable])
ErrTiKVServerBusy = terror.ClassTiKV.New(mysql.ErrTiKVServerBusy, mysql.MySQLErrName[mysql.ErrTiKVServerBusy])
ErrGCTooEarly = terror.ClassTiKV.New(mysql.ErrGCTooEarly, mysql.MySQLErrName[mysql.ErrGCTooEarly])
ErrQueryInterrupted = terror.ClassTiKV.New(mysql.ErrQueryInterrupted, mysql.MySQLErrName[mysql.ErrQueryInterrupted])
)

// ErrDeadlock wraps *kvrpcpb.Deadlock to implement the error interface.
Expand All @@ -60,6 +61,7 @@ func init() {
mysql.ErrTiKVServerBusy: mysql.ErrTiKVServerBusy,
mysql.ErrGCTooEarly: mysql.ErrGCTooEarly,
mysql.ErrTruncatedWrongValue: mysql.ErrTruncatedWrongValue,
mysql.ErrQueryInterrupted: mysql.ErrQueryInterrupted,
}
terror.ErrClassToMySQLCodes[terror.ClassTiKV] = tikvMySQLErrCodes
}
2 changes: 1 addition & 1 deletion store/tikv/ticlient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *testTiclientSuite) TestSingleKey(c *C) {
txn := s.beginTxn(c)
err := txn.Set(encodeKey(s.prefix, "key"), []byte("value"))
c.Assert(err, IsNil)
err = txn.LockKeys(context.Background(), 0, encodeKey(s.prefix, "key"))
err = txn.LockKeys(context.Background(), nil, 0, encodeKey(s.prefix, "key"))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand Down
11 changes: 9 additions & 2 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/dgryski/go-farm"
Expand Down Expand Up @@ -380,7 +381,7 @@ func (txn *tikvTxn) rollbackPessimisticLocks() error {
return txn.committer.pessimisticRollbackKeys(NewBackoffer(context.Background(), cleanupMaxBackoff), txn.lockKeys)
}

func (txn *tikvTxn) LockKeys(ctx context.Context, forUpdateTS uint64, keysInput ...kv.Key) error {
func (txn *tikvTxn) LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, keysInput ...kv.Key) error {
// Exclude keys that are already locked.
keys := make([][]byte, 0, len(keysInput))
txn.mu.Lock()
Expand Down Expand Up @@ -424,7 +425,13 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, forUpdateTS uint64, keysInput
// If the number of keys greater than 1, it can be on different region,
// concurrently execute on multiple regions may lead to deadlock.
txn.committer.isFirstLock = len(txn.lockKeys) == 0 && len(keys) == 1
err := txn.committer.pessimisticLockKeys(bo, keys)
err := txn.committer.pessimisticLockKeys(bo, killed, keys)
if killed != nil {
// If the kill signal is received during waiting for pessimisticLock,
// pessimisticLockKeys would handle the error but it doesn't reset the flag.
// We need to reset the killed flag here.
atomic.CompareAndSwapUint32(killed, 1, 0)
}
if err != nil {
for _, key := range keys {
txn.us.DeleteKeyExistErrInfo(key)
Expand Down

0 comments on commit d9073f5

Please sign in to comment.