Skip to content

Commit d2a15b3

Browse files
authored
feat: support logical plan for EXECUTE statement (#13194)
* Add Execute LogicalPlan * Fix compile * Add tests
1 parent f2bebcd commit d2a15b3

File tree

12 files changed

+130
-12
lines changed

12 files changed

+130
-12
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1201,6 +1201,9 @@ impl DefaultPhysicalPlanner {
12011201
// statement can be prepared)
12021202
return not_impl_err!("Unsupported logical plan: Prepare");
12031203
}
1204+
LogicalPlan::Execute(_) => {
1205+
return not_impl_err!("Unsupported logical plan: Execute");
1206+
}
12041207
LogicalPlan::Dml(dml) => {
12051208
// DataFusion is a read-only query engine, but also a library, so consumers may implement this
12061209
return not_impl_err!("Unsupported logical plan: Dml({0})", dml.op);

datafusion/expr/src/expr_rewriter/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,7 @@ impl NamePreserver {
314314
| LogicalPlan::Join(_)
315315
| LogicalPlan::TableScan(_)
316316
| LogicalPlan::Limit(_)
317+
| LogicalPlan::Execute(_)
317318
),
318319
}
319320
}

datafusion/expr/src/logical_plan/display.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ use std::collections::HashMap;
2020
use std::fmt;
2121

2222
use crate::{
23-
expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Expr,
24-
Filter, Join, Limit, LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery,
25-
Repartition, Sort, Subquery, SubqueryAlias, TableProviderFilterPushDown, TableScan,
26-
Unnest, Values, Window,
23+
expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Execute,
24+
Expr, Filter, Join, Limit, LogicalPlan, Partitioning, Prepare, Projection,
25+
RecursiveQuery, Repartition, Sort, Subquery, SubqueryAlias,
26+
TableProviderFilterPushDown, TableScan, Unnest, Values, Window,
2727
};
2828

2929
use crate::dml::CopyTo;
@@ -626,6 +626,15 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
626626
"Data Types": format!("{:?}", data_types)
627627
})
628628
}
629+
LogicalPlan::Execute(Execute {
630+
name, parameters, ..
631+
}) => {
632+
json!({
633+
"Node Type": "Execute",
634+
"Name": name,
635+
"Parameters": expr_vec_fmt!(parameters),
636+
})
637+
}
629638
LogicalPlan::DescribeTable(DescribeTable { .. }) => {
630639
json!({
631640
"Node Type": "DescribeTable"

datafusion/expr/src/logical_plan/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ pub use ddl::{
3636
pub use dml::{DmlStatement, WriteOp};
3737
pub use plan::{
3838
projection_schema, Aggregate, Analyze, ColumnUnnestList, DescribeTable, Distinct,
39-
DistinctOn, EmptyRelation, Explain, Extension, FetchType, Filter, Join,
39+
DistinctOn, EmptyRelation, Execute, Explain, Extension, FetchType, Filter, Join,
4040
JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
4141
Projection, RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery,
4242
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,8 @@ pub enum LogicalPlan {
266266
/// Prepare a statement and find any bind parameters
267267
/// (e.g. `?`). This is used to implement SQL-prepared statements.
268268
Prepare(Prepare),
269+
/// Execute a prepared statement. This is used to implement SQL 'EXECUTE'.
270+
Execute(Execute),
269271
/// Data Manipulation Language (DML): Insert / Update / Delete
270272
Dml(DmlStatement),
271273
/// Data Definition Language (DDL): CREATE / DROP TABLES / VIEWS / SCHEMAS
@@ -314,6 +316,7 @@ impl LogicalPlan {
314316
LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
315317
LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
316318
LogicalPlan::Prepare(Prepare { input, .. }) => input.schema(),
319+
LogicalPlan::Execute(Execute { schema, .. }) => schema,
317320
LogicalPlan::Explain(explain) => &explain.schema,
318321
LogicalPlan::Analyze(analyze) => &analyze.schema,
319322
LogicalPlan::Extension(extension) => extension.node.schema(),
@@ -457,6 +460,7 @@ impl LogicalPlan {
457460
| LogicalPlan::Statement { .. }
458461
| LogicalPlan::EmptyRelation { .. }
459462
| LogicalPlan::Values { .. }
463+
| LogicalPlan::Execute { .. }
460464
| LogicalPlan::DescribeTable(_) => vec![],
461465
}
462466
}
@@ -560,6 +564,7 @@ impl LogicalPlan {
560564
LogicalPlan::Subquery(_) => Ok(None),
561565
LogicalPlan::EmptyRelation(_)
562566
| LogicalPlan::Prepare(_)
567+
| LogicalPlan::Execute(_)
563568
| LogicalPlan::Statement(_)
564569
| LogicalPlan::Values(_)
565570
| LogicalPlan::Explain(_)
@@ -712,6 +717,7 @@ impl LogicalPlan {
712717
LogicalPlan::Analyze(_) => Ok(self),
713718
LogicalPlan::Explain(_) => Ok(self),
714719
LogicalPlan::Prepare(_) => Ok(self),
720+
LogicalPlan::Execute(_) => Ok(self),
715721
LogicalPlan::TableScan(_) => Ok(self),
716722
LogicalPlan::EmptyRelation(_) => Ok(self),
717723
LogicalPlan::Statement(_) => Ok(self),
@@ -1072,6 +1078,14 @@ impl LogicalPlan {
10721078
input: Arc::new(input),
10731079
}))
10741080
}
1081+
LogicalPlan::Execute(Execute { name, schema, .. }) => {
1082+
self.assert_no_inputs(inputs)?;
1083+
Ok(LogicalPlan::Execute(Execute {
1084+
name: name.clone(),
1085+
schema: Arc::clone(schema),
1086+
parameters: expr,
1087+
}))
1088+
}
10751089
LogicalPlan::TableScan(ts) => {
10761090
self.assert_no_inputs(inputs)?;
10771091
Ok(LogicalPlan::TableScan(TableScan {
@@ -1330,6 +1344,7 @@ impl LogicalPlan {
13301344
| LogicalPlan::Copy(_)
13311345
| LogicalPlan::DescribeTable(_)
13321346
| LogicalPlan::Prepare(_)
1347+
| LogicalPlan::Execute(_)
13331348
| LogicalPlan::Statement(_)
13341349
| LogicalPlan::Extension(_) => None,
13351350
}
@@ -1933,6 +1948,9 @@ impl LogicalPlan {
19331948
}) => {
19341949
write!(f, "Prepare: {name:?} {data_types:?} ")
19351950
}
1951+
LogicalPlan::Execute(Execute { name, parameters, .. }) => {
1952+
write!(f, "Execute: {} params=[{}]", name, expr_vec_fmt!(parameters))
1953+
}
19361954
LogicalPlan::DescribeTable(DescribeTable { .. }) => {
19371955
write!(f, "DescribeTable")
19381956
}
@@ -2599,6 +2617,27 @@ pub struct Prepare {
25992617
pub input: Arc<LogicalPlan>,
26002618
}
26012619

2620+
/// Execute a prepared statement.
2621+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2622+
pub struct Execute {
2623+
/// The name of the prepared statement to execute
2624+
pub name: String,
2625+
/// The execute parameters
2626+
pub parameters: Vec<Expr>,
2627+
/// Dummy schema
2628+
pub schema: DFSchemaRef,
2629+
}
2630+
2631+
// Comparison excludes the `schema` field.
2632+
impl PartialOrd for Execute {
2633+
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2634+
match self.name.partial_cmp(&other.name) {
2635+
Some(Ordering::Equal) => self.parameters.partial_cmp(&other.parameters),
2636+
cmp => cmp,
2637+
}
2638+
}
2639+
}
2640+
26022641
/// Describe the schema of table
26032642
///
26042643
/// # Example output:

datafusion/expr/src/logical_plan/tree_node.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,10 @@
3838
//! * [`LogicalPlan::expressions`]: Return a copy of the plan's expressions
3939
use crate::{
4040
dml::CopyTo, Aggregate, Analyze, CreateMemoryTable, CreateView, DdlStatement,
41-
Distinct, DistinctOn, DmlStatement, Explain, Expr, Extension, Filter, Join, Limit,
42-
LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, Repartition, Sort,
43-
Subquery, SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode, Values,
44-
Window,
41+
Distinct, DistinctOn, DmlStatement, Execute, Explain, Expr, Extension, Filter, Join,
42+
Limit, LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, Repartition,
43+
Sort, Subquery, SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode,
44+
Values, Window,
4545
};
4646
use std::ops::Deref;
4747
use std::sync::Arc;
@@ -363,6 +363,7 @@ impl TreeNode for LogicalPlan {
363363
| LogicalPlan::Statement { .. }
364364
| LogicalPlan::EmptyRelation { .. }
365365
| LogicalPlan::Values { .. }
366+
| LogicalPlan::Execute { .. }
366367
| LogicalPlan::DescribeTable(_) => Transformed::no(self),
367368
})
368369
}
@@ -505,6 +506,9 @@ impl LogicalPlan {
505506
.chain(fetch.iter())
506507
.map(|e| e.deref())
507508
.apply_until_stop(f),
509+
LogicalPlan::Execute(Execute { parameters, .. }) => {
510+
parameters.iter().apply_until_stop(f)
511+
}
508512
// plans without expressions
509513
LogicalPlan::EmptyRelation(_)
510514
| LogicalPlan::RecursiveQuery(_)
@@ -734,6 +738,20 @@ impl LogicalPlan {
734738
})
735739
})
736740
}
741+
LogicalPlan::Execute(Execute {
742+
parameters,
743+
name,
744+
schema,
745+
}) => parameters
746+
.into_iter()
747+
.map_until_stop_and_collect(f)?
748+
.update_data(|parameters| {
749+
LogicalPlan::Execute(Execute {
750+
parameters,
751+
name,
752+
schema,
753+
})
754+
}),
737755
// plans without expressions
738756
LogicalPlan::EmptyRelation(_)
739757
| LogicalPlan::Unnest(_)

datafusion/optimizer/src/common_subexpr_eliminate.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,8 @@ impl OptimizerRule for CommonSubexprEliminate {
553553
| LogicalPlan::Copy(_)
554554
| LogicalPlan::Unnest(_)
555555
| LogicalPlan::RecursiveQuery(_)
556-
| LogicalPlan::Prepare(_) => {
556+
| LogicalPlan::Prepare(_)
557+
| LogicalPlan::Execute(_) => {
557558
// This rule handles recursion itself in a `ApplyOrder::TopDown` like
558559
// manner.
559560
plan.map_children(|c| self.rewrite(c, config))?

datafusion/optimizer/src/optimize_projections/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,8 @@ fn optimize_projections(
348348
| LogicalPlan::RecursiveQuery(_)
349349
| LogicalPlan::Statement(_)
350350
| LogicalPlan::Values(_)
351-
| LogicalPlan::DescribeTable(_) => {
351+
| LogicalPlan::DescribeTable(_)
352+
| LogicalPlan::Execute(_) => {
352353
// These operators have no inputs, so stop the optimization process.
353354
return Ok(Transformed::no(plan));
354355
}

datafusion/proto/src/logical_plan/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1633,6 +1633,9 @@ impl AsLogicalPlan for LogicalPlanNode {
16331633
LogicalPlan::RecursiveQuery(_) => Err(proto_error(
16341634
"LogicalPlan serde is not yet implemented for RecursiveQuery",
16351635
)),
1636+
LogicalPlan::Execute(_) => Err(proto_error(
1637+
"LogicalPlan serde is not yet implemented for Execute",
1638+
)),
16361639
}
16371640
}
16381641
}

datafusion/sql/src/statement.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use datafusion_expr::{
4848
CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody,
4949
CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, DescribeTable,
5050
DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView, EmptyRelation,
51-
Explain, Expr, ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder,
51+
Execute, Explain, Expr, ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder,
5252
OperateFunctionArg, PlanType, Prepare, SetVariable, SortExpr,
5353
Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode,
5454
TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
@@ -642,6 +642,30 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
642642
input: Arc::new(plan),
643643
}))
644644
}
645+
Statement::Execute {
646+
name,
647+
parameters,
648+
using,
649+
} => {
650+
// `USING` is a MySQL-specific syntax and currently not supported.
651+
if !using.is_empty() {
652+
return not_impl_err!(
653+
"Execute statement with USING is not supported"
654+
);
655+
}
656+
657+
let empty_schema = DFSchema::empty();
658+
let parameters = parameters
659+
.into_iter()
660+
.map(|expr| self.sql_to_expr(expr, &empty_schema, planner_context))
661+
.collect::<Result<Vec<Expr>>>()?;
662+
663+
Ok(LogicalPlan::Execute(Execute {
664+
name: ident_to_string(&name),
665+
parameters,
666+
schema: DFSchemaRef::new(empty_schema),
667+
}))
668+
}
645669

646670
Statement::ShowTables {
647671
extended,

0 commit comments

Comments
 (0)