Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: fix update panic when update in prepare and execute #26759

Merged
merged 35 commits into from
Aug 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
ccf66fd
fix update panic when update in prepare and execute
AilinKid Jul 30, 2021
e66e134
Merge branch 'master' into fix-update-in-prepare-execute
AilinKid Jul 30, 2021
0b94fdc
fmt
AilinKid Jul 30, 2021
47b7c9a
Merge branch 'master' into fix-update-in-prepare-execute
AilinKid Aug 2, 2021
843fabe
remove risky code
AilinKid Aug 2, 2021
0644de4
Merge branch 'master' into fix-update-in-prepare-execute
AilinKid Aug 2, 2021
9ba22b5
remote unnecessary code
AilinKid Aug 2, 2021
91127a1
Merge branch 'master' into fix-update-in-prepare-execute
AilinKid Aug 2, 2021
39bdf6f
fmt
AilinKid Aug 2, 2021
7f75aac
resolve point get exit condition
AilinKid Aug 2, 2021
d04b0dd
Merge branch 'master' into fix-update-in-prepare-execute
AilinKid Aug 2, 2021
dbe84bf
remove controversial load schema in Execute
AilinKid Aug 2, 2021
525a740
Merge branch 'master' into fix-update-in-prepare-execute
AilinKid Aug 2, 2021
b358909
fix test
AilinKid Aug 3, 2021
f5d7f0a
Merge branch 'master' into fix-update-in-prepare-execute
AilinKid Aug 3, 2021
1d5e335
fix test
AilinKid Aug 3, 2021
13e0a6f
fix test
AilinKid Aug 3, 2021
d6ec84b
Merge branch 'master' into fix-update-in-prepare-execute
AilinKid Aug 3, 2021
4548083
fix the text-protocol execute statement could't see schema change
AilinKid Aug 3, 2021
44146c4
make fmt
AilinKid Aug 3, 2021
3be9a6f
Merge branch 'master' into fix-update-in-prepare-execute
AilinKid Aug 3, 2021
3f0de0e
comment
AilinKid Aug 3, 2021
e9227ed
import
AilinKid Aug 3, 2021
2959ab5
refactor the logic into process
AilinKid Aug 4, 2021
48b66e4
Merge branch 'master' into fix-update-in-prepare-execute
AilinKid Aug 4, 2021
4dd7843
refactor the logic into process
AilinKid Aug 4, 2021
8086a16
Merge branch 'master' into fix-update-in-prepare-execute
AilinKid Aug 4, 2021
42b381b
make fmt
AilinKid Aug 4, 2021
d41aa35
Merge branch 'master' into fix-update-in-prepare-execute
AilinKid Aug 4, 2021
5a20e18
Merge branch 'master' into fix-update-in-prepare-execute
AilinKid Aug 4, 2021
2e1d6da
remove the TypeExecute enum type
AilinKid Aug 4, 2021
52f774d
Merge branch 'master' into fix-update-in-prepare-execute
AilinKid Aug 4, 2021
963f737
Merge branch 'master' into fix-update-in-prepare-execute
AilinKid Aug 5, 2021
ac14ea0
rename func
AilinKid Aug 5, 2021
da070d7
Merge branch 'master' into fix-update-in-prepare-execute
AilinKid Aug 5, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1950,7 +1950,8 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor {
if b.err != nil {
return nil
}
b.err = plannercore.CheckUpdateList(assignFlag, v)
// should use the new tblID2table, since the update's schema may have been changed in Execstmt.
b.err = plannercore.CheckUpdateList(assignFlag, v, tblID2table)
if b.err != nil {
return nil
}
Expand Down
4 changes: 3 additions & 1 deletion executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
}

ret := &plannercore.PreprocessorReturn{}
if err := plannercore.Preprocess(c.Ctx, stmtNode, plannercore.WithPreprocessorReturn(ret)); err != nil {
pe := &plannercore.PreprocessExecuteISUpdate{ExecuteInfoSchemaUpdate: planner.GetExecuteForUpdateReadIS, Node: stmtNode}
err := plannercore.Preprocess(c.Ctx, stmtNode, plannercore.WithPreprocessorReturn(ret), plannercore.WithExecuteInfoSchemaUpdate(pe))
if err != nil {
return nil, err
}
stmtNode = plannercore.TryAddExtraLimit(c.Ctx, stmtNode)
Expand Down
4 changes: 4 additions & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ func (e *Execute) OptimizePreparedPlan(ctx context.Context, sctx sessionctx.Cont
preparedObj.Executor = nil
// If the schema version has changed we need to preprocess it again,
// if this time it failed, the real reason for the error is schema changed.
// Example:
// When running update in prepared statement's schema version distinguished from the one of execute statement
// We should reset the tableRefs in the prepared update statements, otherwise, the ast nodes still hold the old
// tableRefs columnInfo which will cause chaos in logic of trying point get plan. (should ban non-public column)
ret := &PreprocessorReturn{InfoSchema: is}
err := Preprocess(sctx, prepared.Stmt, InPrepare, WithPreprocessorReturn(ret))
if err != nil {
Expand Down
12 changes: 11 additions & 1 deletion planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,16 @@ func (ds *DataSource) skylinePruning(prop *property.PhysicalProperty) []*candida
return candidates
}

func (ds *DataSource) isPointGetConvertableSchema() bool {
for _, col := range ds.Columns {
// Only handle tables that all columns are public.
if col.State != model.StatePublic {
return false
}
}
return true
}

// findBestTask implements the PhysicalPlan interface.
// It will enumerate all the available indices and choose a plan with least cost.
func (ds *DataSource) findBestTask(prop *property.PhysicalProperty, planCounter *PlanCounterTp) (t task, cntPlan int64, err error) {
Expand Down Expand Up @@ -745,7 +755,7 @@ func (ds *DataSource) findBestTask(prop *property.PhysicalProperty, planCounter
p: dual,
}, cntPlan, nil
}
canConvertPointGet := len(path.Ranges) > 0 && path.StoreType == kv.TiKV
canConvertPointGet := len(path.Ranges) > 0 && path.StoreType == kv.TiKV && ds.isPointGetConvertableSchema()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if executing normal statements without prepare-execute model? It affects those statements now.

Copy link
Contributor Author

@AilinKid AilinKid Aug 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's ok, it's is a normal limitation.

if canConvertPointGet && !path.IsIntHandlePath {
// We simply do not build [batch] point get for prefix indexes. This can be optimized.
canConvertPointGet = path.Index.Unique && !path.Index.HasPrefixIndex()
Expand Down
4 changes: 2 additions & 2 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4655,10 +4655,10 @@ type tblUpdateInfo struct {
}

// CheckUpdateList checks all related columns in updatable state.
func CheckUpdateList(assignFlags []int, updt *Update) error {
func CheckUpdateList(assignFlags []int, updt *Update, newTblID2Table map[int64]table.Table) error {
updateFromOtherAlias := make(map[int64]tblUpdateInfo)
for _, content := range updt.TblColPosInfos {
tbl := updt.tblID2Table[content.TblID]
tbl := newTblID2Table[content.TblID]
flags := assignFlags[content.Start:content.End]
var update, updatePK bool
for i, col := range tbl.WritableCols() {
Expand Down
26 changes: 24 additions & 2 deletions planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ func WithPreprocessorReturn(ret *PreprocessorReturn) PreprocessOpt {
}
}

// WithExecuteInfoSchemaUpdate return a PreprocessOpt to update the `Execute` infoSchema under some conditions.
func WithExecuteInfoSchemaUpdate(pe *PreprocessExecuteISUpdate) PreprocessOpt {
return func(p *preprocessor) {
p.PreprocessExecuteISUpdate = pe
}
}

// TryAddExtraLimit trys to add an extra limit for SELECT or UNION statement when sql_select_limit is set.
func TryAddExtraLimit(ctx sessionctx.Context, node ast.StmtNode) ast.StmtNode {
if ctx.GetSessionVars().SelectLimit == math.MaxUint64 || ctx.GetSessionVars().InRestrictedSQL {
Expand Down Expand Up @@ -143,6 +150,12 @@ type PreprocessorReturn struct {
TxnScope string
}

// PreprocessExecuteISUpdate is used to update information schema for special Execute statement in the preprocessor.
type PreprocessExecuteISUpdate struct {
ExecuteInfoSchemaUpdate func(node ast.Node, sctx sessionctx.Context) infoschema.InfoSchema
Node ast.Node
}

// preprocessor is an ast.Visitor that preprocess
// ast Nodes parsed from parser.
type preprocessor struct {
Expand All @@ -157,6 +170,7 @@ type preprocessor struct {

// values that may be returned
*PreprocessorReturn
*PreprocessExecuteISUpdate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not name it Preprocessor like other members?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is PreprocessExecuteISUpdate necessary? How about just call IsExecuteForUpdateRead in ensureInfoSchema?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import circle problem, pass function poninter here is trying to do this

err error
}

Expand Down Expand Up @@ -1596,9 +1610,17 @@ func (p *preprocessor) handleAsOfAndReadTS(node *ast.AsOfClause) {
// - session variable
// - transaction context
func (p *preprocessor) ensureInfoSchema() infoschema.InfoSchema {
if p.InfoSchema == nil {
p.InfoSchema = p.ctx.GetInfoSchema().(infoschema.InfoSchema)
if p.InfoSchema != nil {
return p.InfoSchema
}
// `Execute` under some conditions need to see the latest information schema.
if p.PreprocessExecuteISUpdate != nil {
if newInfoSchema := p.ExecuteInfoSchemaUpdate(p.Node, p.ctx); newInfoSchema != nil {
p.InfoSchema = newInfoSchema
return p.InfoSchema
}
}
p.InfoSchema = p.ctx.GetInfoSchema().(infoschema.InfoSchema)
return p.InfoSchema
}

Expand Down
30 changes: 18 additions & 12 deletions planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,24 @@ func IsReadOnly(node ast.Node, vars *variable.SessionVars) bool {
return ast.IsReadOnly(node)
}

// GetExecuteForUpdateReadIS is used to check whether the statement is `execute` and target statement has a forUpdateRead flag.
// If so, we will return the latest information schema.
func GetExecuteForUpdateReadIS(node ast.Node, sctx sessionctx.Context) infoschema.InfoSchema {
if execStmt, isExecStmt := node.(*ast.ExecuteStmt); isExecStmt {
vars := sctx.GetSessionVars()
execID := execStmt.ExecID
if execStmt.Name != "" {
execID = vars.PreparedStmtNameToID[execStmt.Name]
}
if preparedPointer, ok := vars.PreparedStmts[execID]; ok {
if preparedObj, ok := preparedPointer.(*core.CachedPrepareStmt); ok && preparedObj.ForUpdateRead {
return domain.GetDomain(sctx).InfoSchema()
}
}
}
return nil
}

// Optimize does optimization and creates a Plan.
// The node must be prepared first.
func Optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is infoschema.InfoSchema) (plannercore.Plan, types.NameSlice, error) {
Expand Down Expand Up @@ -318,18 +336,6 @@ func optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is in
}
sctx.GetSessionVars().RewritePhaseInfo.DurationRewrite = time.Since(beginRewrite)

if execPlan, ok := p.(*plannercore.Execute); ok {
execID := execPlan.ExecID
if execPlan.Name != "" {
execID = sctx.GetSessionVars().PreparedStmtNameToID[execPlan.Name]
}
if preparedPointer, ok := sctx.GetSessionVars().PreparedStmts[execID]; ok {
if preparedObj, ok := preparedPointer.(*core.CachedPrepareStmt); ok && preparedObj.ForUpdateRead {
is = domain.GetDomain(sctx).InfoSchema()
}
}
}

sctx.GetSessionVars().StmtCtx.Tables = builder.GetDBTableInfo()
activeRoles := sctx.GetSessionVars().ActiveRoles
// Check privilege. Maybe it's better to move this to the Preprocess, but
Expand Down