Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: simplify plan cache code #52379

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,16 +284,16 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {

// try to reuse point get executor
// We should only use the cached the executor when the startTS is MaxUint64
if a.PsStmt.Executor != nil && useMaxTS {
exec, ok := a.PsStmt.Executor.(*PointGetExecutor)
if a.PsStmt.PointGet.Executor != nil && useMaxTS {
exec, ok := a.PsStmt.PointGet.Executor.(*PointGetExecutor)
if !ok {
logutil.Logger(ctx).Error("invalid executor type, not PointGetExecutor for point get path")
a.PsStmt.Executor = nil
a.PsStmt.PointGet.Executor = nil
} else {
// CachedPlan type is already checked in last step
pointGetPlan := a.PsStmt.PreparedAst.CachedPlan.(*plannercore.PointGetPlan)
pointGetPlan := a.PsStmt.PointGet.Plan.(*plannercore.PointGetPlan)
exec.Init(pointGetPlan)
a.PsStmt.Executor = exec
a.PsStmt.PointGet.Executor = exec
executor = exec
}
}
Expand All @@ -308,7 +308,7 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {

// Don't cache the executor for non point-get (table dual) or partitioned tables
if ok && useMaxTS && pointExecutor.partitionDefIdx == nil {
a.PsStmt.Executor = pointExecutor
a.PsStmt.PointGet.Executor = pointExecutor
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS
stmt.PsStmt = preparedObj
} else {
// invalid the previous cached point plan
preparedObj.PreparedAst.CachedPlan = nil
preparedObj.PointGet.Plan = nil
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,10 @@ func (e *DeallocateExec) Next(context.Context, *chunk.Chunk) error {
if !ok {
return errors.Errorf("invalid PlanCacheStmt type")
}
prepared := preparedObj.PreparedAst
delete(vars.PreparedStmtNameToID, e.Name)
if e.Ctx().GetSessionVars().EnablePreparedPlanCache {
bindSQL, _ := bindinfo.MatchSQLBindingForPlanCache(e.Ctx(), preparedObj.PreparedAst.Stmt, &preparedObj.BindingInfo)
cacheKey, err := plannercore.NewPlanCacheKey(vars, preparedObj.StmtText, preparedObj.StmtDB, prepared.SchemaVersion,
cacheKey, err := plannercore.NewPlanCacheKey(vars, preparedObj.StmtText, preparedObj.StmtDB, preparedObj.SchemaVersion,
0, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load())
if err != nil {
return err
Expand Down
8 changes: 2 additions & 6 deletions pkg/parser/ast/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,12 +552,8 @@ func (n *DeallocateStmt) Accept(v Visitor) (Node, bool) {

// Prepared represents a prepared statement.
type Prepared struct {
Stmt StmtNode
StmtType string
Params []ParamMarkerExpr
SchemaVersion int64
CachedPlan interface{}
CachedNames interface{}
Stmt StmtNode
StmtType string
}

// ExecuteStmt is a statement to execute PreparedStmt.
Expand Down
55 changes: 26 additions & 29 deletions pkg/planner/core/plan_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,24 +95,24 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep
vars.StmtCtx.StmtType = stmtAst.StmtType

// step 1: check parameter number
if len(stmtAst.Params) != len(params) {
if len(stmt.Params) != len(params) {
return errors.Trace(plannererrors.ErrWrongParamCount)
}

// step 2: set parameter values
if err := SetParameterValuesIntoSCtx(sctx.GetPlanCtx(), isNonPrepared, stmtAst.Params, params); err != nil {
if err := SetParameterValuesIntoSCtx(sctx.GetPlanCtx(), isNonPrepared, stmt.Params, params); err != nil {
return errors.Trace(err)
}

// step 3: check schema version
if stmtAst.SchemaVersion != is.SchemaMetaVersion() {
if stmt.SchemaVersion != is.SchemaMetaVersion() {
// In order to avoid some correctness issues, we have to clear the
// cached plan once the schema version is changed.
// Cached plan in prepared struct does NOT have a "cache key" with
// schema version like prepared plan cache key
stmtAst.CachedPlan = nil
stmt.Executor = nil
stmt.ColumnInfos = nil
stmt.PointGet.Plan = nil
stmt.PointGet.Executor = nil
stmt.PointGet.ColumnInfos = nil
// If the schema version has changed we need to preprocess it again,
// if this time it failed, the real reason for the error is schema changed.
// Example:
Expand All @@ -124,7 +124,7 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep
if err != nil {
return plannererrors.ErrSchemaChanged.GenWithStack("Schema change caused error: %s", err.Error())
}
stmtAst.SchemaVersion = is.SchemaMetaVersion()
stmt.SchemaVersion = is.SchemaMetaVersion()
}

// step 4: handle expiration
Expand All @@ -135,7 +135,7 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep
expiredTimeStamp4PC := domain.GetDomain(sctx).ExpiredTimeStamp4PC()
if stmt.StmtCacheable && expiredTimeStamp4PC.Compare(vars.LastUpdateTime4PC) > 0 {
sctx.GetSessionPlanCache().DeleteAll()
stmtAst.CachedPlan = nil
stmt.PointGet.Plan = nil
vars.LastUpdateTime4PC = expiredTimeStamp4PC
}
return nil
Expand All @@ -155,7 +155,6 @@ func GetPlanFromSessionPlanCache(ctx context.Context, sctx sessionctx.Context,
var cacheKey kvcache.Key
sessVars := sctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx
stmtAst := stmt.PreparedAst
cacheEnabled := false
if isNonPrepared {
stmtCtx.CacheType = stmtctx.SessionNonPrepared
Expand Down Expand Up @@ -190,13 +189,13 @@ func GetPlanFromSessionPlanCache(ctx context.Context, sctx sessionctx.Context,
latestSchemaVersion = domain.GetDomain(sctx).InfoSchema().SchemaMetaVersion()
}
if cacheKey, err = NewPlanCacheKey(sctx.GetSessionVars(), stmt.StmtText,
stmt.StmtDB, stmtAst.SchemaVersion, latestSchemaVersion, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load()); err != nil {
stmt.StmtDB, stmt.SchemaVersion, latestSchemaVersion, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load()); err != nil {
return nil, nil, err
}
}

if stmtCtx.UseCache && stmtAst.CachedPlan != nil { // special code path for fast point plan
if plan, names, ok, err := getCachedPointPlan(stmtAst, sessVars, stmtCtx); ok {
if stmtCtx.UseCache && stmt.PointGet.Plan != nil { // special code path for fast point plan
if plan, names, ok, err := getCachedPointPlan(stmt, sessVars, stmtCtx); ok {
return plan, names, err
}
}
Expand Down Expand Up @@ -234,15 +233,15 @@ func parseParamTypes(sctx sessionctx.Context, params []expression.Expression) (p
return
}

func getCachedPointPlan(stmt *ast.Prepared, sessVars *variable.SessionVars, stmtCtx *stmtctx.StatementContext) (Plan,
func getCachedPointPlan(stmt *PlanCacheStmt, sessVars *variable.SessionVars, stmtCtx *stmtctx.StatementContext) (Plan,
[]*types.FieldName, bool, error) {
// short path for point-get plans
// Rewriting the expression in the select.where condition will convert its
// type from "paramMarker" to "Constant".When Point Select queries are executed,
// the expression in the where condition will not be evaluated,
// so you don't need to consider whether prepared.useCache is enabled.
plan := stmt.CachedPlan.(Plan)
names := stmt.CachedNames.(types.NameSlice)
plan := stmt.PointGet.Plan.(Plan)
names := stmt.PointGet.ColumnNames.(types.NameSlice)
if !RebuildPlan4CachedPlan(plan) {
return nil, nil, false, nil
}
Expand Down Expand Up @@ -335,7 +334,7 @@ func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared
if _, isolationReadContainTiFlash := sessVars.IsolationReadEngines[kv.TiFlash]; isolationReadContainTiFlash && !IsReadOnly(stmtAst.Stmt, sessVars) {
delete(sessVars.IsolationReadEngines, kv.TiFlash)
if cacheKey, err = NewPlanCacheKey(sessVars, stmt.StmtText, stmt.StmtDB,
stmtAst.SchemaVersion, latestSchemaVersion, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load()); err != nil {
stmt.SchemaVersion, latestSchemaVersion, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load()); err != nil {
return nil, nil, err
}
sessVars.IsolationReadEngines[kv.TiFlash] = struct{}{}
Expand Down Expand Up @@ -777,9 +776,8 @@ func tryCachePointPlan(_ context.Context, sctx PlanContext,
return nil
}
var (
stmtAst = stmt.PreparedAst
ok bool
err error
ok bool
err error
)

if plan, _ok := p.(*PointGetPlan); _ok {
Expand All @@ -794,8 +792,8 @@ func tryCachePointPlan(_ context.Context, sctx PlanContext,

if ok {
// just cache point plan now
stmtAst.CachedPlan = p
stmtAst.CachedNames = names
stmt.PointGet.Plan = p
stmt.PointGet.ColumnNames = names
stmt.NormalizedPlan, stmt.PlanDigest = NormalizePlan(p)
sctx.GetSessionVars().StmtCtx.SetPlan(p)
sctx.GetSessionVars().StmtCtx.SetPlanDigest(stmt.NormalizedPlan, stmt.PlanDigest)
Expand All @@ -807,32 +805,31 @@ func tryCachePointPlan(_ context.Context, sctx PlanContext,
// Be careful with the short path, current precondition is ths cached plan satisfying
// IsPointGetWithPKOrUniqueKeyByAutoCommit
func IsPointGetPlanShortPathOK(sctx sessionctx.Context, is infoschema.InfoSchema, stmt *PlanCacheStmt) (bool, error) {
stmtAst := stmt.PreparedAst
if stmtAst.CachedPlan == nil || staleread.IsStmtStaleness(sctx) {
if stmt.PointGet.Plan == nil || staleread.IsStmtStaleness(sctx) {
return false, nil
}
// check auto commit
if !IsAutoCommitTxn(sctx.GetSessionVars()) {
return false, nil
}
if stmtAst.SchemaVersion != is.SchemaMetaVersion() {
stmtAst.CachedPlan = nil
stmt.ColumnInfos = nil
if stmt.SchemaVersion != is.SchemaMetaVersion() {
stmt.PointGet.Plan = nil
stmt.PointGet.ColumnInfos = nil
return false, nil
}
// maybe we'd better check cached plan type here, current
// only point select/update will be cached, see "getPhysicalPlan" func
var ok bool
var err error
switch stmtAst.CachedPlan.(type) {
switch stmt.PointGet.Plan.(type) {
case *PointGetPlan:
ok = true
case *Update:
pointUpdate := stmtAst.CachedPlan.(*Update)
pointUpdate := stmt.PointGet.Plan.(*Update)
_, ok = pointUpdate.SelectPlan.(*PointGetPlan)
if !ok {
err = errors.Errorf("cached update plan not point update")
stmtAst.CachedPlan = nil
stmt.PointGet.Plan = nil
return false, err
}
default:
Expand Down
38 changes: 24 additions & 14 deletions pkg/planner/core/plan_cache_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,8 @@ func GeneratePlanCacheStmtWithAST(ctx context.Context, sctx sessionctx.Context,
}

prepared := &ast.Prepared{
Stmt: paramStmt,
StmtType: ast.GetStmtLabel(paramStmt),
Params: extractor.markers,
SchemaVersion: ret.InfoSchema.SchemaMetaVersion(),
Stmt: paramStmt,
StmtType: ast.GetStmtLabel(paramStmt),
}
normalizedSQL, digest := parser.NormalizeDigest(prepared.Stmt.Text())

Expand Down Expand Up @@ -158,8 +156,8 @@ func GeneratePlanCacheStmtWithAST(ctx context.Context, sctx sessionctx.Context,
// parameters are unknown here, so regard them all as NULL.
// For non-prepared statements, all parameters are already initialized at `ParameterizeAST`, so no need to set NULL.
if isPrepStmt {
for i := range prepared.Params {
param := prepared.Params[i].(*driver.ParamMarkerExpr)
for i := range extractor.markers {
param := extractor.markers[i].(*driver.ParamMarkerExpr)
param.Datum.SetNull()
param.InExecute = false
}
Expand Down Expand Up @@ -194,6 +192,8 @@ func GeneratePlanCacheStmtWithAST(ctx context.Context, sctx sessionctx.Context,
StmtCacheable: cacheable,
UncacheableReason: reason,
QueryFeatures: features,
SchemaVersion: ret.InfoSchema.SchemaMetaVersion(),
Params: extractor.markers,
}
if err = CheckPreparedPriv(sctx, preparedObj, ret.InfoSchema); err != nil {
return nil, nil, 0, err
Expand Down Expand Up @@ -443,14 +443,24 @@ func (*PlanCacheQueryFeatures) Leave(in ast.Node) (out ast.Node, ok bool) {

// PlanCacheStmt store prepared ast from PrepareExec and other related fields
type PlanCacheStmt struct {
PreparedAst *ast.Prepared
StmtDB string // which DB the statement will be processed over
VisitInfos []visitInfo
ColumnInfos any
// Executor is only used for point get scene.
// Notice that we should only cache the PointGetExecutor that have a snapshot with MaxTS in it.
// If the current plan is not PointGet or does not use MaxTS optimization, this value should be nil here.
Executor any
PreparedAst *ast.Prepared
StmtDB string // which DB the statement will be processed over
VisitInfos []visitInfo
Params []ast.ParamMarkerExpr
SchemaVersion int64

// To improve the performance of PointGet further, directly cache PointGet execution info here like Executor, ColumnNames and etc.
// Use any to avoid cycle import.
// TODO: this implementation is risky and tricky to the optimizer, refactor it.
PointGet struct {
ColumnInfos any
ColumnNames any
// Executor is only used for point get scene.
// Notice that we should only cache the PointGetExecutor that have a snapshot with MaxTS in it.
// If the current plan is not PointGet or does not use MaxTS optimization, this value should be nil here.
Executor any
Plan any // the cached PointGetPlan
}

StmtCacheable bool // Whether this stmt is cacheable.
UncacheableReason string // Why this stmt is uncacheable.
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/driver_tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (ts *TiDBStatement) Close() error {
}
bindSQL, _ := bindinfo.MatchSQLBindingForPlanCache(ts.ctx, preparedObj.PreparedAst.Stmt, &preparedObj.BindingInfo)
cacheKey, err := core.NewPlanCacheKey(ts.ctx.GetSessionVars(), preparedObj.StmtText, preparedObj.StmtDB,
preparedObj.PreparedAst.SchemaVersion, 0, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load())
preparedObj.SchemaVersion, 0, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load())
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/internal/resultset/resultset.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (trs *tidbResultSet) Columns() []*column.Info {
// for prepare statement, try to get cached columnInfo array
if trs.preparedStmt != nil {
ps := trs.preparedStmt
if colInfos, ok := ps.ColumnInfos.([]*column.Info); ok {
if colInfos, ok := ps.PointGet.ColumnInfos.([]*column.Info); ok {
trs.columns = colInfos
}
}
Expand All @@ -111,7 +111,7 @@ func (trs *tidbResultSet) Columns() []*column.Info {
if trs.preparedStmt != nil {
// if Info struct has allocated object,
// here maybe we need deep copy Info to do caching
trs.preparedStmt.ColumnInfos = trs.columns
trs.preparedStmt.PointGet.ColumnInfos = trs.columns
}
}
return trs.columns
Expand Down
11 changes: 5 additions & 6 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,17 +298,16 @@ func (s *session) cleanRetryInfo() {
planCacheEnabled := s.GetSessionVars().EnablePreparedPlanCache
var cacheKey kvcache.Key
var err error
var preparedAst *ast.Prepared
var preparedObj *plannercore.PlanCacheStmt
var stmtText, stmtDB string
if planCacheEnabled {
firstStmtID := retryInfo.DroppedPreparedStmtIDs[0]
if preparedPointer, ok := s.sessionVars.PreparedStmts[firstStmtID]; ok {
preparedObj, ok := preparedPointer.(*plannercore.PlanCacheStmt)
preparedObj, ok = preparedPointer.(*plannercore.PlanCacheStmt)
if ok {
preparedAst = preparedObj.PreparedAst
stmtText, stmtDB = preparedObj.StmtText, preparedObj.StmtDB
bindSQL, _ := bindinfo.MatchSQLBindingForPlanCache(s.pctx, preparedObj.PreparedAst.Stmt, &preparedObj.BindingInfo)
cacheKey, err = plannercore.NewPlanCacheKey(s.sessionVars, stmtText, stmtDB, preparedAst.SchemaVersion,
cacheKey, err = plannercore.NewPlanCacheKey(s.sessionVars, stmtText, stmtDB, preparedObj.SchemaVersion,
0, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load())
if err != nil {
logutil.Logger(s.currentCtx).Warn("clean cached plan failed", zap.Error(err))
Expand All @@ -319,8 +318,8 @@ func (s *session) cleanRetryInfo() {
}
for i, stmtID := range retryInfo.DroppedPreparedStmtIDs {
if planCacheEnabled {
if i > 0 && preparedAst != nil {
plannercore.SetPstmtIDSchemaVersion(cacheKey, stmtText, preparedAst.SchemaVersion, s.sessionVars.IsolationReadEngines)
if i > 0 && preparedObj != nil {
plannercore.SetPstmtIDSchemaVersion(cacheKey, stmtText, preparedObj.SchemaVersion, s.sessionVars.IsolationReadEngines)
}
if !s.sessionVars.IgnorePreparedCacheCloseStmt { // keep the plan in cache
s.GetSessionPlanCache().Delete(cacheKey)
Expand Down