Skip to content

Commit

Permalink
session: fix linearizability for non-autocommit async-commit txn (#22879
Browse files Browse the repository at this point in the history
)

* fix linearizability for non-autocommit async-commit txntxn

Signed-off-by: Yilin Chen <sticnarf@gmail.com>

* add IsExplicit flag

Signed-off-by: Yilin Chen <sticnarf@gmail.com>

* set missing cases

Signed-off-by: Yilin Chen <sticnarf@gmail.com>

* remove redundant assignment

Signed-off-by: Yilin Chen <sticnarf@gmail.com>

* revert go sum change

Signed-off-by: Yilin Chen <sticnarf@gmail.com>

* fmt

Signed-off-by: Yilin Chen <sticnarf@gmail.com>

* clarify comment

Signed-off-by: Yilin Chen <sticnarf@gmail.com>

Co-authored-by: Ti Chi Robot <71242396+ti-chi-bot@users.noreply.github.com>
  • Loading branch information
sticnarf and ti-chi-bot authored Feb 24, 2021
1 parent ae46cda commit b226130
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 21 deletions.
2 changes: 1 addition & 1 deletion executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (e *DDLExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
txnCtx.InfoSchema = is
txnCtx.SchemaVersion = is.SchemaMetaVersion()
// DDL will force commit old transaction, after DDL, in transaction status should be false.
e.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusInTrans, false)
e.ctx.GetSessionVars().SetInTxn(false)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion executor/grant.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (e *GrantExec) Next(ctx context.Context, req *chunk.Chunk) error {
if err := e.ctx.NewTxn(ctx); err != nil {
return err
}
defer func() { e.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusInTrans, false) }()
defer func() { e.ctx.GetSessionVars().SetInTxn(false) }()

// Create internal session to start internal transaction.
isCommit := false
Expand Down
2 changes: 1 addition & 1 deletion executor/revoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (e *RevokeExec) Next(ctx context.Context, req *chunk.Chunk) error {
if err := e.ctx.NewTxn(ctx); err != nil {
return err
}
defer func() { e.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusInTrans, false) }()
defer func() { e.ctx.GetSessionVars().SetInTxn(false) }()

// Create internal session to start internal transaction.
isCommit := false
Expand Down
10 changes: 5 additions & 5 deletions executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (e *SimpleExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
if err := e.ctx.NewTxn(ctx); err != nil {
return err
}
defer func() { e.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusInTrans, false) }()
defer func() { e.ctx.GetSessionVars().SetInTxn(false) }()
}

switch x := e.Statement.(type) {
Expand Down Expand Up @@ -586,7 +586,7 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error {
// With START TRANSACTION, autocommit remains disabled until you end
// the transaction with COMMIT or ROLLBACK. The autocommit mode then
// reverts to its previous state.
e.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusInTrans, true)
e.ctx.GetSessionVars().SetInTxn(true)
// Call ctx.Txn(true) to active pending txn.
txnMode := s.Mode
if txnMode == "" {
Expand Down Expand Up @@ -668,7 +668,7 @@ func (e *SimpleExec) executeStartTransactionReadOnlyWithTimestampBound(ctx conte
// With START TRANSACTION, autocommit remains disabled until you end
// the transaction with COMMIT or ROLLBACK. The autocommit mode then
// reverts to its previous state.
e.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusInTrans, true)
e.ctx.GetSessionVars().SetInTxn(true)
return nil
}

Expand Down Expand Up @@ -737,13 +737,13 @@ func (e *SimpleExec) executeRevokeRole(s *ast.RevokeRoleStmt) error {
}

func (e *SimpleExec) executeCommit(s *ast.CommitStmt) {
e.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusInTrans, false)
e.ctx.GetSessionVars().SetInTxn(false)
}

func (e *SimpleExec) executeRollback(s *ast.RollbackStmt) error {
sessVars := e.ctx.GetSessionVars()
logutil.BgLogger().Debug("execute rollback statement", zap.Uint64("conn", sessVars.ConnectionID))
sessVars.SetStatusFlag(mysql.ServerStatusInTrans, false)
sessVars.SetInTxn(false)
txn, err := e.ctx.Txn(false)
if err != nil {
return err
Expand Down
12 changes: 6 additions & 6 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ func (s *session) doCommit(ctx context.Context) error {
}
defer func() {
s.txn.changeToInvalid()
s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, false)
s.sessionVars.SetInTxn(false)
}()
if s.txn.IsReadOnly() {
return nil
Expand Down Expand Up @@ -499,7 +499,7 @@ func (s *session) doCommit(ctx context.Context) error {
// An auto-commit transaction fetches its startTS from the TSO so its commitTS > its startTS > the commitTS
// of any previously committed transactions.
s.txn.SetOption(kv.GuaranteeLinearizability,
!s.GetSessionVars().IsAutocommit() && s.GetSessionVars().GuaranteeLinearizability)
s.GetSessionVars().TxnCtx.IsExplicit && s.GetSessionVars().GuaranteeLinearizability)
}

return s.txn.Commit(tikvutil.SetSessionID(ctx, s.GetSessionVars().ConnectionID))
Expand Down Expand Up @@ -611,7 +611,7 @@ func (s *session) RollbackTxn(ctx context.Context) {
}
s.txn.changeToInvalid()
s.sessionVars.TxnCtx.Cleanup()
s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, false)
s.sessionVars.SetInTxn(false)
}

func (s *session) GetClient() kv.Client {
Expand Down Expand Up @@ -697,7 +697,7 @@ func (s *session) retry(ctx context.Context, maxCnt uint) (err error) {
s.sessionVars.RetryInfo.Retrying = false
// retryCnt only increments on retryable error, so +1 here.
metrics.SessionRetry.Observe(float64(retryCnt + 1))
s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, false)
s.sessionVars.SetInTxn(false)
if err != nil {
s.RollbackTxn(ctx)
}
Expand Down Expand Up @@ -791,7 +791,7 @@ func (s *session) retry(ctx context.Context, maxCnt uint) (err error) {
zap.String("txn", s.txn.GoString()))
kv.BackOff(retryCnt)
s.txn.changeToInvalid()
s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, false)
s.sessionVars.SetInTxn(false)
}
return err
}
Expand Down Expand Up @@ -1902,7 +1902,7 @@ func (s *session) Txn(active bool) (kv.Transaction, error) {
s.txn.SetOption(kv.Pessimistic, true)
}
if !s.sessionVars.IsAutocommit() {
s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true)
s.sessionVars.SetInTxn(true)
}
s.sessionVars.TxnCtx.CouldRetry = s.isTxnRetryable()
s.txn.SetVars(s.sessionVars.KVVars)
Expand Down
46 changes: 45 additions & 1 deletion session/session_fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,52 @@ func (s *testSessionSerialSuite) TestAutoCommitNeedNotLinearizability(c *C) {

tk.Se.GetSessionVars().SetSystemVar("tidb_enable_async_commit", "1")
tk.Se.GetSessionVars().SetSystemVar("tidb_guarantee_linearizability", "1")
// Auto-commit transactions don't need to get minCommitTS from TSO
tk.MustExec("INSERT INTO t1 VALUES (1)")

tk.Se.GetSessionVars().SetSystemVar("tidb_enable_1pc", "1")
tk.MustExec("BEGIN")
tk.MustExec("INSERT INTO t1 VALUES (2)")
// An explicit transaction needs to get minCommitTS from TSO
func() {
defer func() {
err := recover()
c.Assert(err, NotNil)
}()
tk.MustExec("COMMIT")
}()

tk.MustExec("set autocommit = 0")
tk.MustExec("INSERT INTO t1 VALUES (3)")
func() {
defer func() {
err := recover()
c.Assert(err, NotNil)
}()
tk.MustExec("COMMIT")
}()

// Same for 1PC
tk.MustExec("set autocommit = 1")
tk.Se.GetSessionVars().SetSystemVar("tidb_enable_1pc", "1")
tk.MustExec("INSERT INTO t1 VALUES (4)")

tk.MustExec("BEGIN")
tk.MustExec("INSERT INTO t1 VALUES (5)")
func() {
defer func() {
err := recover()
c.Assert(err, NotNil)
}()
tk.MustExec("COMMIT")
}()

tk.MustExec("set autocommit = 0")
tk.MustExec("INSERT INTO t1 VALUES (6)")
func() {
defer func() {
err := recover()
c.Assert(err, NotNil)
}()
tk.MustExec("COMMIT")
}()
}
3 changes: 1 addition & 2 deletions session/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/errno"
Expand Down Expand Up @@ -275,7 +274,7 @@ func checkStmtLimit(ctx context.Context, se *session) error {
// The last history could not be "commit"/"rollback" statement.
// It means it is impossible to start a new transaction at the end of the transaction.
// Because after the server executed "commit"/"rollback" statement, the session is out of the transaction.
sessVars.SetStatusFlag(mysql.ServerStatusInTrans, true)
sessVars.SetInTxn(true)
}
return err
}
Expand Down
20 changes: 16 additions & 4 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,12 @@ type TransactionContext struct {
IsPessimistic bool
// IsStaleness indicates whether the txn is read only staleness txn.
IsStaleness bool
Isolation string
LockExpire uint32
ForUpdate uint32
// IsExplicit indicates whether the txn is an interactive txn, which is typically started with a BEGIN
// or START TRANSACTION statement, or by setting autocommit to 0.
IsExplicit bool
Isolation string
LockExpire uint32
ForUpdate uint32
// TxnScope indicates the value of txn_scope
TxnScope string

Expand Down Expand Up @@ -1171,6 +1174,15 @@ func (s *SessionVars) GetStatusFlag(flag uint16) bool {
return s.Status&flag > 0
}

// SetInTxn sets whether the session is in transaction.
// It also updates the IsExplicit flag in TxnCtx if val is true.
func (s *SessionVars) SetInTxn(val bool) {
s.SetStatusFlag(mysql.ServerStatusInTrans, val)
if val {
s.TxnCtx.IsExplicit = true
}
}

// InTxn returns if the session is in transaction.
func (s *SessionVars) InTxn() bool {
return s.GetStatusFlag(mysql.ServerStatusInTrans)
Expand Down Expand Up @@ -1359,7 +1371,7 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
isAutocommit := TiDBOptOn(val)
s.SetStatusFlag(mysql.ServerStatusAutocommit, isAutocommit)
if isAutocommit {
s.SetStatusFlag(mysql.ServerStatusInTrans, false)
s.SetInTxn(false)
}
case AutoIncrementIncrement:
// AutoIncrementIncrement is valid in [1, 65535].
Expand Down

0 comments on commit b226130

Please sign in to comment.