Skip to content

Commit

Permalink
*: directly save prepare execute args as datums in binary proto (ping…
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored and jackysp committed Jul 2, 2019
1 parent 5886bb9 commit cb23b52
Show file tree
Hide file tree
Showing 14 changed files with 193 additions and 165 deletions.
8 changes: 3 additions & 5 deletions executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -265,15 +266,12 @@ func (e *DeallocateExec) Next(ctx context.Context, req *chunk.Chunk) error {
}

// CompileExecutePreparedStmt compiles a session Execute command to a stmt.Statement.
func CompileExecutePreparedStmt(ctx sessionctx.Context, ID uint32, args ...interface{}) (sqlexec.Statement, error) {
func CompileExecutePreparedStmt(ctx sessionctx.Context, ID uint32, args []types.Datum) (sqlexec.Statement, error) {
execStmt := &ast.ExecuteStmt{ExecID: ID}
if err := ResetContextOfStmt(ctx, execStmt); err != nil {
return nil, err
}
execStmt.UsingVars = make([]ast.ExprNode, len(args))
for i, val := range args {
execStmt.UsingVars[i] = ast.NewValueExpr(val)
}
execStmt.BinaryArgs = args
is := GetInfoSchema(ctx)
execPlan, err := planner.Optimize(ctx, execStmt, is)
if err != nil {
Expand Down
29 changes: 15 additions & 14 deletions executor/seqtest/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/metrics"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/testkit"
dto "github.com/prometheus/client_model/go"
)
Expand Down Expand Up @@ -83,7 +84,7 @@ func (s *seqTestSuite) TestPrepared(c *C) {
query := "select c1, c2 from prepare_test where c1 = ?"
stmtID, _, _, err := tk.Se.PrepareStmt(query)
c.Assert(err, IsNil)
rs, err := tk.Se.ExecutePreparedStmt(ctx, stmtID, 1)
rs, err := tk.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(1)})
c.Assert(err, IsNil)
tk.ResultSetToResult(rs, Commentf("%v", rs)).Check(testkit.Rows("1 <nil>"))

Expand All @@ -93,29 +94,29 @@ func (s *seqTestSuite) TestPrepared(c *C) {
c.Assert(err, IsNil)
tk1 := testkit.NewTestKitWithInit(c, s.store)
tk1.MustExec("insert prepare_test (c1) values (3)")
rs, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, 3)
rs, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(3)})
c.Assert(err, IsNil)
tk.ResultSetToResult(rs, Commentf("%v", rs)).Check(testkit.Rows("3"))

tk.MustExec("delete from prepare_test")
query = "select c1 from prepare_test where c1 = (select c1 from prepare_test where c1 = ?)"
stmtID, _, _, err = tk.Se.PrepareStmt(query)
c.Assert(err, IsNil)
_, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, 3)
_, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(3)})
c.Assert(err, IsNil)
tk1.MustExec("insert prepare_test (c1) values (3)")
rs, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, 3)
rs, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(3)})
c.Assert(err, IsNil)
tk.ResultSetToResult(rs, Commentf("%v", rs)).Check(testkit.Rows("3"))

tk.MustExec("delete from prepare_test")
query = "select c1 from prepare_test where c1 in (select c1 from prepare_test where c1 = ?)"
stmtID, _, _, err = tk.Se.PrepareStmt(query)
c.Assert(err, IsNil)
_, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, 3)
_, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(3)})
c.Assert(err, IsNil)
tk1.MustExec("insert prepare_test (c1) values (3)")
rs, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, 3)
rs, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(3)})
c.Assert(err, IsNil)
tk.ResultSetToResult(rs, Commentf("%v", rs)).Check(testkit.Rows("3"))

Expand All @@ -125,12 +126,12 @@ func (s *seqTestSuite) TestPrepared(c *C) {
stmtID, _, _, err = tk.Se.PrepareStmt(query)
c.Assert(err, IsNil)
tk.MustExec("rollback")
rs, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, 4)
rs, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(4)})
c.Assert(err, IsNil)
tk.ResultSetToResult(rs, Commentf("%v", rs)).Check(testkit.Rows())

// Check that ast.Statement created by executor.CompileExecutePreparedStmt has query text.
stmt, err := executor.CompileExecutePreparedStmt(tk.Se, stmtID, 1)
stmt, err := executor.CompileExecutePreparedStmt(tk.Se, stmtID, []types.Datum{types.NewDatum(1)})
c.Assert(err, IsNil)
c.Assert(stmt.OriginText(), Equals, query)

Expand All @@ -150,7 +151,7 @@ func (s *seqTestSuite) TestPrepared(c *C) {
tk.Exec("create table prepare2 (a int)")

// Should success as the changed schema do not affect the prepared statement.
_, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, 1)
_, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(1)})
c.Assert(err, IsNil)

// Drop a column so the prepared statement become invalid.
Expand All @@ -159,11 +160,11 @@ func (s *seqTestSuite) TestPrepared(c *C) {
c.Assert(err, IsNil)
tk.MustExec("alter table prepare_test drop column c2")

_, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, 1)
_, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(1)})
c.Assert(plannercore.ErrUnknownColumn.Equal(err), IsTrue)

tk.MustExec("drop table prepare_test")
_, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, 1)
_, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(1)})
c.Assert(plannercore.ErrSchemaChanged.Equal(err), IsTrue)

// issue 3381
Expand Down Expand Up @@ -233,11 +234,11 @@ func (s *seqTestSuite) TestPrepared(c *C) {
// issue 8065
stmtID, _, _, err = tk.Se.PrepareStmt("select ? from dual")
c.Assert(err, IsNil)
_, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, 1)
_, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(1)})
c.Assert(err, IsNil)
stmtID, _, _, err = tk.Se.PrepareStmt("update prepare1 set a = ? where a = ?")
c.Assert(err, IsNil)
_, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, 1, 1)
_, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(1), types.NewDatum(1)})
c.Assert(err, IsNil)
}
}
Expand Down Expand Up @@ -282,7 +283,7 @@ func (s *seqTestSuite) TestPreparedLimitOffset(c *C) {

stmtID, _, _, err := tk.Se.PrepareStmt("select id from prepare_test limit ?")
c.Assert(err, IsNil)
_, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, 1)
_, err = tk.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(1)})
c.Assert(err, IsNil)
}
}
Expand Down
49 changes: 33 additions & 16 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,12 @@ type Prepare struct {
type Execute struct {
baseSchemaProducer

Name string
UsingVars []expression.Expression
ExecID uint32
Stmt ast.StmtNode
Plan Plan
Name string
UsingVars []expression.Expression
PrepareParams []types.Datum
ExecID uint32
Stmt ast.StmtNode
Plan Plan
}

// OptimizePreparedPlan optimizes the prepared statement.
Expand All @@ -169,20 +170,36 @@ func (e *Execute) OptimizePreparedPlan(ctx sessionctx.Context, is infoschema.Inf
return errors.Trace(ErrStmtNotFound)
}

if len(prepared.Params) != len(e.UsingVars) {
return errors.Trace(ErrWrongParamCount)
}
paramLen := len(e.PrepareParams)
if paramLen > 0 {
// for binary protocol execute, argument is placed in vars.PrepareParams
if len(prepared.Params) != paramLen {
return errors.Trace(ErrWrongParamCount)
}
vars.PreparedParams = e.PrepareParams
for i, val := range vars.PreparedParams {
param := prepared.Params[i].(*driver.ParamMarkerExpr)
param.Datum = val
param.InExecute = true
}
} else {
// for `execute stmt using @a, @b, @c`, using value in e.UsingVars
if len(prepared.Params) != len(e.UsingVars) {
return errors.Trace(ErrWrongParamCount)
}

for i, usingVar := range e.UsingVars {
val, err := usingVar.Eval(chunk.Row{})
if err != nil {
return err
for i, usingVar := range e.UsingVars {
val, err := usingVar.Eval(chunk.Row{})
if err != nil {
return err
}
param := prepared.Params[i].(*driver.ParamMarkerExpr)
param.Datum = val
param.InExecute = true
vars.PreparedParams = append(vars.PreparedParams, val)
}
param := prepared.Params[i].(*driver.ParamMarkerExpr)
param.Datum = val
param.InExecute = true
vars.PreparedParams = append(vars.PreparedParams, val)
}

if prepared.SchemaVersion != is.SchemaMetaVersion() {
// If the schema version has changed we need to preprocess it again,
// if this time it failed, the real reason for the error is schema changed.
Expand Down
3 changes: 3 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,9 @@ func (b *PlanBuilder) buildExecute(v *ast.ExecuteStmt) (Plan, error) {
vars = append(vars, newExpr)
}
exe := &Execute{Name: v.Name, UsingVars: vars, ExecID: v.ExecID}
if v.BinaryArgs != nil {
exe.PrepareParams = v.BinaryArgs.([]types.Datum)
}
return exe, nil
}

Expand Down
Loading

0 comments on commit cb23b52

Please sign in to comment.