Skip to content

Commit

Permalink
*: Introduce OptimisticTxnContextProvider for optimistic txn (#35131)
Browse files Browse the repository at this point in the history
close #35130
  • Loading branch information
lcwangchao authored Jun 20, 2022
1 parent 2a45f21 commit 2e1e3f1
Show file tree
Hide file tree
Showing 28 changed files with 825 additions and 325 deletions.
8 changes: 4 additions & 4 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ func TestReorg(t *testing.T) {
require.Equal(t, ctx.Value(testCtxKey), 1)
ctx.ClearValue(testCtxKey)

err = ctx.NewTxn(context.Background())
err = sessiontxn.NewTxn(context.Background(), ctx)
require.NoError(t, err)
txn, err := ctx.Txn(true)
require.NoError(t, err)
Expand All @@ -568,7 +568,7 @@ func TestReorg(t *testing.T) {
err = txn.Rollback()
require.NoError(t, err)

err = ctx.NewTxn(context.Background())
err = sessiontxn.NewTxn(context.Background(), ctx)
require.NoError(t, err)
txn, err = ctx.Txn(true)
require.NoError(t, err)
Expand All @@ -583,7 +583,7 @@ func TestReorg(t *testing.T) {
ID: 1,
SnapshotVer: 1, // Make sure it is not zero. So the reorgInfo's first is false.
}
err = ctx.NewTxn(context.Background())
err = sessiontxn.NewTxn(context.Background(), ctx)
require.NoError(t, err)
txn, err = ctx.Txn(true)
require.NoError(t, err)
Expand Down Expand Up @@ -614,7 +614,7 @@ func TestReorg(t *testing.T) {
// Test whether reorgInfo's Handle is update.
err = txn.Commit(context.Background())
require.NoError(t, err)
err = ctx.NewTxn(context.Background())
err = sessiontxn.NewTxn(context.Background(), ctx)
require.NoError(t, err)

m = meta.NewMeta(txn)
Expand Down
31 changes: 3 additions & 28 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"bytes"
"context"
"fmt"
"math"
"runtime/trace"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -239,8 +238,7 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec
})

ctx = a.observeStmtBeginForTopSQL(ctx)
startTs := uint64(math.MaxUint64)
err := a.Ctx.InitTxnWithStartTS(startTs)
startTs, err := sessiontxn.GetTxnManager(a.Ctx).GetStmtReadTS()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -832,31 +830,8 @@ func (a *ExecStmt) buildExecutor() (Executor, error) {
ctx := a.Ctx
stmtCtx := ctx.GetSessionVars().StmtCtx
if _, ok := a.Plan.(*plannercore.Execute); !ok {
if snapshotTS := ctx.GetSessionVars().SnapshotTS; snapshotTS != 0 {
if err := ctx.InitTxnWithStartTS(snapshotTS); err != nil {
return nil, err
}
} else {
// Do not sync transaction for Execute statement, because the real optimization work is done in
// "ExecuteExec.Build".
useMaxTS, err := plannercore.IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx, a.Plan)
if err != nil {
return nil, err
}
if useMaxTS {
logutil.BgLogger().Debug("init txnStartTS with MaxUint64", zap.Uint64("conn", ctx.GetSessionVars().ConnectionID), zap.String("text", a.Text))
if err := ctx.InitTxnWithStartTS(math.MaxUint64); err != nil {
return nil, err
}
}
if stmtPri := stmtCtx.Priority; stmtPri == mysql.NoPriority {
switch {
case useMaxTS:
stmtCtx.Priority = kv.PriorityHigh
case a.LowerPriority:
stmtCtx.Priority = kv.PriorityLow
}
}
if stmtCtx.Priority == mysql.NoPriority && a.LowerPriority {
stmtCtx.Priority = kv.PriorityLow
}
}
if _, ok := a.Plan.(*plannercore.Analyze); ok && ctx.GetSessionVars().InRestrictedSQL {
Expand Down
16 changes: 0 additions & 16 deletions executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,22 +285,6 @@ func (e *ExecuteExec) Next(ctx context.Context, req *chunk.Chunk) error {
// Build builds a prepared statement into an executor.
// After Build, e.StmtExec will be used to do the real execution.
func (e *ExecuteExec) Build(b *executorBuilder) error {
if snapshotTS := e.ctx.GetSessionVars().SnapshotTS; snapshotTS != 0 {
if err := e.ctx.InitTxnWithStartTS(snapshotTS); err != nil {
return err
}
} else {
ok, err := plannercore.IsPointGetWithPKOrUniqueKeyByAutoCommit(e.ctx, e.plan)
if err != nil {
return err
}
if ok {
err = e.ctx.InitTxnWithStartTS(math.MaxUint64)
if err != nil {
return err
}
}
}
stmtExec := b.build(e.plan)
if b.err != nil {
log.Warn("rebuild plan in EXECUTE statement failed", zap.String("labelName of PREPARE statement", e.name))
Expand Down
1 change: 0 additions & 1 deletion planner/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ go_library(
"//metrics",
"//parser",
"//parser/ast",
"//parser/model",
"//planner/cascades",
"//planner/core",
"//privilege",
Expand Down
1 change: 0 additions & 1 deletion planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ go_library(
"//sessionctx/stmtctx",
"//sessionctx/variable",
"//sessiontxn",
"//sessiontxn/legacy",
"//sessiontxn/staleread",
"//statistics",
"//statistics/handle",
Expand Down
18 changes: 1 addition & 17 deletions planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/legacy"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/temptable"
Expand Down Expand Up @@ -124,9 +123,6 @@ func Preprocess(ctx sessionctx.Context, node ast.Node, preprocessOpt ...Preproce
node.Accept(&v)
// InfoSchema must be non-nil after preprocessing
v.ensureInfoSchema()

v.initTxnContextProviderIfNecessary(node)

return errors.Trace(v.err)
}

Expand Down Expand Up @@ -1689,22 +1685,10 @@ func (p *preprocessor) ensureInfoSchema() infoschema.InfoSchema {
return p.InfoSchema
}

p.InfoSchema = p.ctx.GetInfoSchema().(infoschema.InfoSchema)
p.InfoSchema = sessiontxn.GetTxnManager(p.ctx).GetTxnInfoSchema()
return p.InfoSchema
}

func (p *preprocessor) initTxnContextProviderIfNecessary(node ast.Node) {
if p.err != nil || p.flag&initTxnContextProvider == 0 {
return
}

if provider, ok := sessiontxn.GetTxnManager(p.ctx).GetContextProvider().(*legacy.SimpleTxnContextProvider); ok {
// When the current provider is `legacy.SimpleTxnContextProvider` it should to keep the logic equals to the old implement.
// After refactoring, the `legacy.SimpleTxnContextProvider` will be removed, and this code will be removed too.
provider.InfoSchema = p.ensureInfoSchema()
}
}

func (p *preprocessor) hasAutoConvertWarning(colDef *ast.ColumnDef) bool {
sessVars := p.ctx.GetSessionVars()
if !sessVars.SQLMode.HasStrictMode() && colDef.Tp.GetType() == mysql.TypeVarchar {
Expand Down
4 changes: 4 additions & 0 deletions planner/funcdep/extract_fd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ func TestFDSet_ExtractFD(t *testing.T) {
is := testGetIS(t, tk.Session())
for i, tt := range tests {
comment := fmt.Sprintf("case:%v sql:%s", i, tt.sql)
require.NoError(t, tk.Session().PrepareTxnCtx(context.TODO()))
require.NoError(t, sessiontxn.GetTxnManager(tk.Session()).OnStmtStart(context.TODO()))
stmt, err := par.ParseOneStmt(tt.sql, "", "")
require.NoError(t, err, comment)
tk.Session().GetSessionVars().PlanID = 0
Expand Down Expand Up @@ -309,6 +311,8 @@ func TestFDSet_ExtractFDForApply(t *testing.T) {
ctx := context.TODO()
is := testGetIS(t, tk.Session())
for i, tt := range tests {
require.NoError(t, tk.Session().PrepareTxnCtx(context.TODO()))
require.NoError(t, sessiontxn.GetTxnManager(tk.Session()).OnStmtStart(context.TODO()))
comment := fmt.Sprintf("case:%v sql:%s", i, tt.sql)
stmt, err := par.ParseOneStmt(tt.sql, "", "")
require.NoError(t, err, comment)
Expand Down
29 changes: 0 additions & 29 deletions planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/planner/cascades"
"github.com/pingcap/tidb/planner/core"
plannercore "github.com/pingcap/tidb/planner/core"
Expand Down Expand Up @@ -124,11 +123,6 @@ func Optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is in
fp = plannercore.TryFastPlan(sctx, node)
}
if fp != nil {
if !useMaxTS(sctx, fp) {
if err := txnManger.AdviseWarmup(); err != nil {
return nil, nil, err
}
}
return fp, fp.OutputNames(), nil
}
}
Expand Down Expand Up @@ -487,29 +481,6 @@ func handleEvolveTasks(ctx context.Context, sctx sessionctx.Context, br *bindinf
globalHandle.AddEvolvePlanTask(br.OriginalSQL, br.Db, binding)
}

// useMaxTS returns true when meets following conditions:
// 1. ctx is auto commit tagged.
// 2. plan is point get by pk.
// 3. not a cache table.
func useMaxTS(ctx sessionctx.Context, p plannercore.Plan) bool {
if !plannercore.IsAutoCommitTxn(ctx) {
return false
}
v, ok := p.(*plannercore.PointGetPlan)
if !ok {
return false
}
noSecondRead := v.IndexInfo == nil || (v.IndexInfo.Primary && v.TblInfo.IsCommonHandle)
if !noSecondRead {
return false
}

if v.TblInfo != nil && (v.TblInfo.TableCacheStatusType != model.TableCacheStatusDisable) {
return false
}
return true
}

// OptimizeExecStmt to optimize prepare statement protocol "execute" statement
// this is a short path ONLY does things filling prepare related params
// for point select like plan which does not need extra things
Expand Down
Loading

0 comments on commit 2e1e3f1

Please sign in to comment.