Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix PointGet will return an stale value when tidb_enable_plan_replayer_capture is set #40197

Merged
merged 6 commits into from
Dec 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"fmt"
"math"
"runtime/trace"
"strconv"
"strings"
Expand Down Expand Up @@ -295,8 +296,12 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
}
a.Ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityHigh

var pointExecutor *PointGetExecutor
useMaxTS := startTs == math.MaxUint64

// try to reuse point get executor
if a.PsStmt.Executor != nil {
// We should only use the cached the executor when the startTS is MaxUint64
if a.PsStmt.Executor != nil && useMaxTS {
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
exec, ok := a.PsStmt.Executor.(*PointGetExecutor)
if !ok {
logutil.Logger(ctx).Error("invalid executor type, not PointGetExecutor for point get path")
Expand All @@ -306,17 +311,21 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
pointGetPlan := a.PsStmt.PreparedAst.CachedPlan.(*plannercore.PointGetPlan)
exec.Init(pointGetPlan)
a.PsStmt.Executor = exec
pointExecutor = exec
}
}
if a.PsStmt.Executor == nil {

if pointExecutor == nil {
b := newExecutorBuilder(a.Ctx, a.InfoSchema, a.Ti)
newExecutor := b.build(a.Plan)
pointExecutor = b.build(a.Plan).(*PointGetExecutor)
if b.err != nil {
return nil, b.err
}
a.PsStmt.Executor = newExecutor

if useMaxTS {
a.PsStmt.Executor = pointExecutor
}
}
pointExecutor := a.PsStmt.Executor.(*PointGetExecutor)

if err = pointExecutor.Open(ctx); err != nil {
terror.Call(pointExecutor.Close)
Expand Down
5 changes: 5 additions & 0 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS
}
}
}

if err = sessiontxn.OptimizeWithPlanAndThenWarmUp(c.Ctx, stmt.Plan); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why move the OptimizeWithPlanAndThenWarmUp here from ExecuteStmt?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If IsPlanReplayerCaptureEnabled is true, it will invoke GetReadStmt and activate the txn. Moving OptimizeWithPlanAndThenWarmUp here is to keep the pointget still be optimized before it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to add some comments about this constraint here or the possible implicit activation on the interface GetStmtReadTS and GetStmtForUpdateTS.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comments

return nil, err
}

if c.Ctx.GetSessionVars().IsPlanReplayerCaptureEnabled() && !c.Ctx.GetSessionVars().InRestrictedSQL {
startTS, err := sessiontxn.GetTxnManager(c.Ctx).GetStmtReadTS()
if err != nil {
Expand Down
17 changes: 17 additions & 0 deletions executor/point_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,3 +825,20 @@ func TestPointGetIssue25167(t *testing.T) {
tk.MustExec("insert into t values (1)")
tk.MustQuery("select * from t as of timestamp @a where a = 1").Check(testkit.Rows())
}

func TestPointGetIssue40194(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t1(id int primary key, v int)")
tk.MustExec("insert into t1 values(1, 10)")
tk.MustExec("prepare s from 'select * from t1 where id=1'")
tk.MustExec("set @@tidb_enable_plan_replayer_capture=1")
tk.MustQuery("execute s").Check(testkit.Rows("1 10"))
tk.MustQuery("execute s").Check(testkit.Rows("1 10"))

tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk2.MustExec("update t1 set v=v+1")
tk.MustQuery("execute s").Check(testkit.Rows("1 11"))
}
11 changes: 7 additions & 4 deletions planner/core/plan_cache_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,10 +405,13 @@ func NewPlanCacheValue(plan Plan, names []*types.FieldName, srcMap map[*model.Ta

// PlanCacheStmt store prepared ast from PrepareExec and other related fields
type PlanCacheStmt struct {
PreparedAst *ast.Prepared
StmtDB string // which DB the statement will be processed over
VisitInfos []visitInfo
ColumnInfos interface{}
PreparedAst *ast.Prepared
StmtDB string // which DB the statement will be processed over
VisitInfos []visitInfo
ColumnInfos interface{}
// Executor is only used for point get scene.
// Notice that we should only cache the PointGetExecutor that have a snapshot with MaxTS in it.
// If the current plan is not PointGet or does not use MaxTS optimization, this value should be nil here.
Executor interface{}
NormalizedSQL string
NormalizedPlan string
Expand Down
4 changes: 0 additions & 4 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2180,10 +2180,6 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex
// Transform abstract syntax tree to a physical plan(stored in executor.ExecStmt).
compiler := executor.Compiler{Ctx: s}
stmt, err := compiler.Compile(ctx, stmtNode)
if err == nil {
err = sessiontxn.OptimizeWithPlanAndThenWarmUp(s, stmt.Plan)
}

if err != nil {
s.rollbackOnError(ctx)

Expand Down
2 changes: 2 additions & 0 deletions sessiontxn/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,10 @@ type TxnManager interface {
// GetReadReplicaScope returns the read replica scope
GetReadReplicaScope() string
// GetStmtReadTS returns the read timestamp used by select statement (not for select ... for update)
// Calling this method will activate the txn implicitly if current read is not stale/historical read
GetStmtReadTS() (uint64, error)
// GetStmtForUpdateTS returns the read timestamp used by update/insert/delete or select ... for update
// Calling this method will activate the txn implicitly if current read is not stale/historical read
GetStmtForUpdateTS() (uint64, error)
// GetContextProvider returns the current TxnContextProvider
GetContextProvider() TxnContextProvider
Expand Down
8 changes: 7 additions & 1 deletion sessiontxn/isolation/optimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ func (p *OptimisticTxnContextProvider) AdviseOptimizeWithPlan(plan interface{})
return nil
}

if p.txn != nil {
// `p.txn != nil` means the txn has already been activated, we should not optimize the startTS because the startTS
// has already been used.
return nil
}

realPlan, ok := plan.(plannercore.Plan)
if !ok {
return nil
Expand Down Expand Up @@ -141,7 +147,7 @@ func (p *OptimisticTxnContextProvider) AdviseOptimizeWithPlan(plan interface{})
zap.Uint64("conn", sessVars.ConnectionID),
zap.String("text", sessVars.StmtCtx.OriginalSQL),
)
return nil
return err
}

p.optimizeWithMaxTS = true
Expand Down