Skip to content

Commit 03cfcb2

Browse files
authored
Update API for extension planning to include logical plan (#643)
* Update API for extension planning to include logical plan * Review comments
1 parent fddab22 commit 03cfcb2

File tree

4 files changed

+93
-25
lines changed

4 files changed

+93
-25
lines changed

datafusion/src/execution/context.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3376,6 +3376,16 @@ mod tests {
33763376
"query not supported".to_string(),
33773377
))
33783378
}
3379+
3380+
fn create_physical_expr(
3381+
&self,
3382+
_expr: &Expr,
3383+
_input_dfschema: &crate::logical_plan::DFSchema,
3384+
_input_schema: &Schema,
3385+
_ctx_state: &ExecutionContextState,
3386+
) -> Result<Arc<dyn crate::physical_plan::PhysicalExpr>> {
3387+
unimplemented!()
3388+
}
33793389
}
33803390

33813391
struct MyQueryPlanner {}

datafusion/src/physical_plan/mod.rs

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
use self::{
2121
coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan,
2222
};
23-
use crate::execution::context::ExecutionContextState;
24-
use crate::logical_plan::LogicalPlan;
2523
use crate::physical_plan::expressions::PhysicalSortExpr;
2624
use crate::{
2725
error::{DataFusionError, Result},
@@ -122,16 +120,8 @@ impl SQLMetric {
122120
}
123121
}
124122

125-
/// Physical query planner that converts a `LogicalPlan` to an
126-
/// `ExecutionPlan` suitable for execution.
127-
pub trait PhysicalPlanner {
128-
/// Create a physical plan from a logical plan
129-
fn create_physical_plan(
130-
&self,
131-
logical_plan: &LogicalPlan,
132-
ctx_state: &ExecutionContextState,
133-
) -> Result<Arc<dyn ExecutionPlan>>;
134-
}
123+
/// Physical planner interface
124+
pub use self::planner::PhysicalPlanner;
135125

136126
/// `ExecutionPlan` represent nodes in the DataFusion Physical Plan.
137127
///

datafusion/src/physical_plan/planner.rs

Lines changed: 75 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,7 @@ use crate::physical_plan::sort::SortExec;
3939
use crate::physical_plan::udf;
4040
use crate::physical_plan::windows::WindowAggExec;
4141
use crate::physical_plan::{hash_utils, Partitioning};
42-
use crate::physical_plan::{
43-
AggregateExpr, ExecutionPlan, PhysicalExpr, PhysicalPlanner, WindowExpr,
44-
};
42+
use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, WindowExpr};
4543
use crate::prelude::JoinType;
4644
use crate::scalar::ScalarValue;
4745
use crate::sql::utils::{generate_sort_key, window_expr_common_partition_keys};
@@ -172,16 +170,51 @@ fn physical_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
172170
}
173171
}
174172

173+
/// Physical query planner that converts a `LogicalPlan` to an
174+
/// `ExecutionPlan` suitable for execution.
175+
pub trait PhysicalPlanner {
176+
/// Create a physical plan from a logical plan
177+
fn create_physical_plan(
178+
&self,
179+
logical_plan: &LogicalPlan,
180+
ctx_state: &ExecutionContextState,
181+
) -> Result<Arc<dyn ExecutionPlan>>;
182+
183+
/// Create a physical expression from a logical expression
184+
/// suitable for evaluation
185+
///
186+
/// `expr`: the expression to convert
187+
///
188+
/// `input_dfschema`: the logical plan schema for evaluating `e`
189+
///
190+
/// `input_schema`: the physical schema for evaluating `e`
191+
fn create_physical_expr(
192+
&self,
193+
expr: &Expr,
194+
input_dfschema: &DFSchema,
195+
input_schema: &Schema,
196+
ctx_state: &ExecutionContextState,
197+
) -> Result<Arc<dyn PhysicalExpr>>;
198+
}
199+
175200
/// This trait exposes the ability to plan an [`ExecutionPlan`] out of a [`LogicalPlan`].
176201
pub trait ExtensionPlanner {
177202
/// Create a physical plan for a [`UserDefinedLogicalNode`].
178-
/// This errors when the planner knows how to plan the concrete implementation of `node`
179-
/// but errors while doing so, and `None` when the planner does not know how to plan the `node`
180-
/// and wants to delegate the planning to another [`ExtensionPlanner`].
203+
///
204+
/// `input_dfschema`: the logical plan schema for the inputs to this node
205+
///
206+
/// Returns an error when the planner knows how to plan the concrete
207+
/// implementation of `node` but errors while doing so.
208+
///
209+
/// Returns `None` when the planner does not know how to plan the
210+
/// `node` and wants to delegate the planning to another
211+
/// [`ExtensionPlanner`].
181212
fn plan_extension(
182213
&self,
214+
planner: &dyn PhysicalPlanner,
183215
node: &dyn UserDefinedLogicalNode,
184-
inputs: &[Arc<dyn ExecutionPlan>],
216+
logical_inputs: &[&LogicalPlan],
217+
physical_inputs: &[Arc<dyn ExecutionPlan>],
185218
ctx_state: &ExecutionContextState,
186219
) -> Result<Option<Arc<dyn ExecutionPlan>>>;
187220
}
@@ -210,6 +243,30 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
210243
let plan = self.create_initial_plan(logical_plan, ctx_state)?;
211244
self.optimize_plan(plan, ctx_state)
212245
}
246+
247+
/// Create a physical expression from a logical expression
248+
/// suitable for evaluation
249+
///
250+
/// `e`: the expression to convert
251+
///
252+
/// `input_dfschema`: the logical plan schema for evaluating `e`
253+
///
254+
/// `input_schema`: the physical schema for evaluating `e`
255+
fn create_physical_expr(
256+
&self,
257+
expr: &Expr,
258+
input_dfschema: &DFSchema,
259+
input_schema: &Schema,
260+
ctx_state: &ExecutionContextState,
261+
) -> Result<Arc<dyn PhysicalExpr>> {
262+
DefaultPhysicalPlanner::create_physical_expr(
263+
self,
264+
expr,
265+
input_dfschema,
266+
input_schema,
267+
ctx_state,
268+
)
269+
}
213270
}
214271

215272
impl DefaultPhysicalPlanner {
@@ -721,7 +778,7 @@ impl DefaultPhysicalPlanner {
721778
)))
722779
}
723780
LogicalPlan::Extension { node } => {
724-
let inputs = node
781+
let physical_inputs = node
725782
.inputs()
726783
.into_iter()
727784
.map(|input_plan| self.create_initial_plan(input_plan, ctx_state))
@@ -733,7 +790,13 @@ impl DefaultPhysicalPlanner {
733790
if let Some(plan) = maybe_plan {
734791
Ok(Some(plan))
735792
} else {
736-
planner.plan_extension(node.as_ref(), &inputs, ctx_state)
793+
planner.plan_extension(
794+
self,
795+
node.as_ref(),
796+
&node.inputs(),
797+
&physical_inputs,
798+
ctx_state,
799+
)
737800
}
738801
},
739802
)?;
@@ -1644,8 +1707,10 @@ mod tests {
16441707
/// Create a physical plan for an extension node
16451708
fn plan_extension(
16461709
&self,
1710+
_planner: &dyn PhysicalPlanner,
16471711
_node: &dyn UserDefinedLogicalNode,
1648-
_inputs: &[Arc<dyn ExecutionPlan>],
1712+
_logical_inputs: &[&LogicalPlan],
1713+
_physical_inputs: &[Arc<dyn ExecutionPlan>],
16491714
_ctx_state: &ExecutionContextState,
16501715
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
16511716
Ok(Some(Arc::new(NoOpExecutionPlan {

datafusion/tests/user_defined_plan.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -321,16 +321,19 @@ impl ExtensionPlanner for TopKPlanner {
321321
/// Create a physical plan for an extension node
322322
fn plan_extension(
323323
&self,
324+
_planner: &dyn PhysicalPlanner,
324325
node: &dyn UserDefinedLogicalNode,
325-
inputs: &[Arc<dyn ExecutionPlan>],
326+
logical_inputs: &[&LogicalPlan],
327+
physical_inputs: &[Arc<dyn ExecutionPlan>],
326328
_ctx_state: &ExecutionContextState,
327329
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
328330
Ok(
329331
if let Some(topk_node) = node.as_any().downcast_ref::<TopKPlanNode>() {
330-
assert_eq!(inputs.len(), 1, "Inconsistent number of inputs");
332+
assert_eq!(logical_inputs.len(), 1, "Inconsistent number of inputs");
333+
assert_eq!(physical_inputs.len(), 1, "Inconsistent number of inputs");
331334
// figure out input name
332335
Some(Arc::new(TopKExec {
333-
input: inputs[0].clone(),
336+
input: physical_inputs[0].clone(),
334337
k: topk_node.k,
335338
}))
336339
} else {

0 commit comments

Comments
 (0)