Skip to content

Commit 3fb600d

Browse files
authored
Show optimized physical and logical plans in EXPLAIN (#744)
* Show optimized physical and logical plans in EXPLAIN * rewrite tests * reformat
1 parent b5e034b commit 3fb600d

File tree

8 files changed

+158
-55
lines changed

8 files changed

+158
-55
lines changed

datafusion/src/logical_plan/builder.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ impl LogicalPlanBuilder {
399399
/// Create an expression to represent the explanation of the plan
400400
pub fn explain(&self, verbose: bool) -> Result<Self> {
401401
let stringified_plans = vec![StringifiedPlan::new(
402-
PlanType::LogicalPlan,
402+
PlanType::InitialLogicalPlan,
403403
format!("{:#?}", self.plan.clone()),
404404
)];
405405

@@ -740,14 +740,24 @@ mod tests {
740740
#[test]
741741
fn stringified_plan() {
742742
let stringified_plan =
743-
StringifiedPlan::new(PlanType::LogicalPlan, "...the plan...");
743+
StringifiedPlan::new(PlanType::InitialLogicalPlan, "...the plan...");
744+
assert!(stringified_plan.should_display(true));
745+
assert!(!stringified_plan.should_display(false)); // not in non verbose mode
746+
747+
let stringified_plan =
748+
StringifiedPlan::new(PlanType::FinalLogicalPlan, "...the plan...");
744749
assert!(stringified_plan.should_display(true));
745750
assert!(stringified_plan.should_display(false)); // display in non verbose mode too
746751

747752
let stringified_plan =
748-
StringifiedPlan::new(PlanType::PhysicalPlan, "...the plan...");
753+
StringifiedPlan::new(PlanType::InitialPhysicalPlan, "...the plan...");
749754
assert!(stringified_plan.should_display(true));
750-
assert!(!stringified_plan.should_display(false));
755+
assert!(!stringified_plan.should_display(false)); // not in non verbose mode
756+
757+
let stringified_plan =
758+
StringifiedPlan::new(PlanType::FinalPhysicalPlan, "...the plan...");
759+
assert!(stringified_plan.should_display(true));
760+
assert!(stringified_plan.should_display(false)); // display in non verbose mode
751761

752762
let stringified_plan = StringifiedPlan::new(
753763
PlanType::OptimizedLogicalPlan {

datafusion/src/logical_plan/plan.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -805,28 +805,35 @@ impl fmt::Debug for LogicalPlan {
805805
}
806806
}
807807

808-
/// Represents which type of plan
808+
/// Represents which type of plan, when storing multiple
809+
/// for use in EXPLAIN plans
809810
#[derive(Debug, Clone, PartialEq)]
810811
pub enum PlanType {
811812
/// The initial LogicalPlan provided to DataFusion
812-
LogicalPlan,
813+
InitialLogicalPlan,
813814
/// The LogicalPlan which results from applying an optimizer pass
814815
OptimizedLogicalPlan {
815816
/// The name of the optimizer which produced this plan
816817
optimizer_name: String,
817818
},
818-
/// The physical plan, prepared for execution
819-
PhysicalPlan,
819+
/// The final, fully optimized LogicalPlan that was converted to a physical plan
820+
FinalLogicalPlan,
821+
/// The initial physical plan, prepared for execution
822+
InitialPhysicalPlan,
823+
/// The final, fully optimized physical which would be executed
824+
FinalPhysicalPlan,
820825
}
821826

822827
impl fmt::Display for PlanType {
823828
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
824829
match self {
825-
PlanType::LogicalPlan => write!(f, "logical_plan"),
830+
PlanType::InitialLogicalPlan => write!(f, "initial_logical_plan"),
826831
PlanType::OptimizedLogicalPlan { optimizer_name } => {
827832
write!(f, "logical_plan after {}", optimizer_name)
828833
}
829-
PlanType::PhysicalPlan => write!(f, "physical_plan"),
834+
PlanType::FinalLogicalPlan => write!(f, "logical_plan"),
835+
PlanType::InitialPhysicalPlan => write!(f, "initial_physical_plan"),
836+
PlanType::FinalPhysicalPlan => write!(f, "physical_plan"),
830837
}
831838
}
832839
}
@@ -854,7 +861,10 @@ impl StringifiedPlan {
854861
/// returns true if this plan should be displayed. Generally
855862
/// `verbose_mode = true` will display all available plans
856863
pub fn should_display(&self, verbose_mode: bool) -> bool {
857-
self.plan_type == PlanType::LogicalPlan || verbose_mode
864+
match self.plan_type {
865+
PlanType::FinalLogicalPlan | PlanType::FinalPhysicalPlan => true,
866+
_ => verbose_mode,
867+
}
858868
}
859869
}
860870

datafusion/src/optimizer/utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -542,7 +542,7 @@ mod tests {
542542
&optimizer,
543543
true,
544544
&empty_plan,
545-
&[StringifiedPlan::new(PlanType::LogicalPlan, "...")],
545+
&[StringifiedPlan::new(PlanType::InitialLogicalPlan, "...")],
546546
schema.as_ref(),
547547
&ExecutionProps::new(),
548548
)?;
@@ -556,7 +556,7 @@ mod tests {
556556
assert!(*verbose);
557557

558558
let expected_stringified_plans = vec![
559-
StringifiedPlan::new(PlanType::LogicalPlan, "..."),
559+
StringifiedPlan::new(PlanType::InitialLogicalPlan, "..."),
560560
StringifiedPlan::new(
561561
PlanType::OptimizedLogicalPlan {
562562
optimizer_name: "test_optimizer".into(),

datafusion/src/physical_plan/explain.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,21 @@ pub struct ExplainExec {
4040
schema: SchemaRef,
4141
/// The strings to be printed
4242
stringified_plans: Vec<StringifiedPlan>,
43+
/// control which plans to print
44+
verbose: bool,
4345
}
4446

4547
impl ExplainExec {
4648
/// Create a new ExplainExec
47-
pub fn new(schema: SchemaRef, stringified_plans: Vec<StringifiedPlan>) -> Self {
49+
pub fn new(
50+
schema: SchemaRef,
51+
stringified_plans: Vec<StringifiedPlan>,
52+
verbose: bool,
53+
) -> Self {
4854
ExplainExec {
4955
schema,
5056
stringified_plans,
57+
verbose,
5158
}
5259
}
5360

@@ -103,8 +110,13 @@ impl ExecutionPlan for ExplainExec {
103110
let mut type_builder = StringBuilder::new(self.stringified_plans.len());
104111
let mut plan_builder = StringBuilder::new(self.stringified_plans.len());
105112

106-
for p in &self.stringified_plans {
107-
type_builder.append_value(&p.plan_type.to_string())?;
113+
let plans_to_print = self
114+
.stringified_plans
115+
.iter()
116+
.filter(|s| s.should_display(self.verbose));
117+
118+
for p in plans_to_print {
119+
type_builder.append_value(p.plan_type.to_string())?;
108120
plan_builder.append_value(&*p.plan)?;
109121
}
110122

datafusion/src/physical_plan/planner.rs

Lines changed: 65 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,13 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
240240
logical_plan: &LogicalPlan,
241241
ctx_state: &ExecutionContextState,
242242
) -> Result<Arc<dyn ExecutionPlan>> {
243-
let plan = self.create_initial_plan(logical_plan, ctx_state)?;
244-
self.optimize_plan(plan, ctx_state)
243+
match self.handle_explain(logical_plan, ctx_state)? {
244+
Some(plan) => Ok(plan),
245+
None => {
246+
let plan = self.create_initial_plan(logical_plan, ctx_state)?;
247+
self.optimize_plan(plan, ctx_state)
248+
}
249+
}
245250
}
246251

247252
/// Create a physical expression from a logical expression
@@ -280,7 +285,7 @@ impl DefaultPhysicalPlanner {
280285
Self { extension_planners }
281286
}
282287

283-
/// Optimize a physical plan
288+
/// Optimize a physical plan by applying each physical optimizer
284289
fn optimize_plan(
285290
&self,
286291
plan: Arc<dyn ExecutionPlan>,
@@ -749,32 +754,9 @@ impl DefaultPhysicalPlanner {
749754
"Unsupported logical plan: CreateExternalTable".to_string(),
750755
))
751756
}
752-
LogicalPlan::Explain {
753-
verbose,
754-
plan,
755-
stringified_plans,
756-
schema,
757-
} => {
758-
let input = self.create_initial_plan(plan, ctx_state)?;
759-
760-
let mut stringified_plans = stringified_plans
761-
.iter()
762-
.filter(|s| s.should_display(*verbose))
763-
.cloned()
764-
.collect::<Vec<_>>();
765-
766-
// add in the physical plan if requested
767-
if *verbose {
768-
stringified_plans.push(StringifiedPlan::new(
769-
PlanType::PhysicalPlan,
770-
displayable(input.as_ref()).indent().to_string(),
771-
));
772-
}
773-
Ok(Arc::new(ExplainExec::new(
774-
SchemaRef::new(schema.as_ref().to_owned().into()),
775-
stringified_plans,
776-
)))
777-
}
757+
LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
758+
"Unsupported logical plan: Explain must be root of the plan".to_string(),
759+
)),
778760
LogicalPlan::Extension { node } => {
779761
let physical_inputs = node
780762
.inputs()
@@ -1315,6 +1297,60 @@ impl DefaultPhysicalPlanner {
13151297
options,
13161298
})
13171299
}
1300+
1301+
/// Handles capturing the various plans for EXPLAIN queries
1302+
///
1303+
/// Returns
1304+
/// Some(plan) if optimized, and None if logical_plan was not an
1305+
/// explain (and thus needs to be optimized as normal)
1306+
fn handle_explain(
1307+
&self,
1308+
logical_plan: &LogicalPlan,
1309+
ctx_state: &ExecutionContextState,
1310+
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1311+
if let LogicalPlan::Explain {
1312+
verbose,
1313+
plan,
1314+
stringified_plans,
1315+
schema,
1316+
} = logical_plan
1317+
{
1318+
let final_logical_plan = StringifiedPlan::new(
1319+
PlanType::FinalLogicalPlan,
1320+
plan.display_indent().to_string(),
1321+
);
1322+
1323+
let input = self.create_initial_plan(plan, ctx_state)?;
1324+
1325+
let initial_physical_plan = StringifiedPlan::new(
1326+
PlanType::InitialPhysicalPlan,
1327+
displayable(input.as_ref()).indent().to_string(),
1328+
);
1329+
1330+
let input = self.optimize_plan(input, ctx_state)?;
1331+
1332+
let final_physical_plan = StringifiedPlan::new(
1333+
PlanType::FinalPhysicalPlan,
1334+
displayable(input.as_ref()).indent().to_string(),
1335+
);
1336+
1337+
let stringified_plans = stringified_plans
1338+
.iter()
1339+
.cloned()
1340+
.chain(std::iter::once(final_logical_plan))
1341+
.chain(std::iter::once(initial_physical_plan))
1342+
.chain(std::iter::once(final_physical_plan))
1343+
.collect::<Vec<_>>();
1344+
1345+
Ok(Some(Arc::new(ExplainExec::new(
1346+
SchemaRef::new(schema.as_ref().to_owned().into()),
1347+
stringified_plans,
1348+
*verbose,
1349+
))))
1350+
} else {
1351+
Ok(None)
1352+
}
1353+
}
13181354
}
13191355

13201356
fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {

datafusion/src/sql/planner.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
234234
let plan = self.sql_statement_to_plan(statement)?;
235235

236236
let stringified_plans = vec![StringifiedPlan::new(
237-
PlanType::LogicalPlan,
238-
format!("{:#?}", plan),
237+
PlanType::InitialLogicalPlan,
238+
plan.display_indent().to_string(),
239239
)];
240240

241241
let schema = LogicalPlan::explain_schema();

datafusion/tests/sql.rs

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1972,17 +1972,28 @@ async fn csv_explain() {
19721972
register_aggregate_csv_by_sql(&mut ctx).await;
19731973
let sql = "EXPLAIN SELECT c1 FROM aggregate_test_100 where c2 > 10";
19741974
let actual = execute(&mut ctx, sql).await;
1975-
let expected = vec![vec![
1976-
"logical_plan",
1977-
"Projection: #aggregate_test_100.c1\
1978-
\n Filter: #aggregate_test_100.c2 Gt Int64(10)\
1979-
\n TableScan: aggregate_test_100 projection=None",
1975+
let actual = normalize_vec_for_explain(actual);
1976+
let expected = vec![
1977+
vec![
1978+
"logical_plan",
1979+
"Projection: #aggregate_test_100.c1\
1980+
\n Filter: #aggregate_test_100.c2 Gt Int64(10)\
1981+
\n TableScan: aggregate_test_100 projection=Some([0, 1])"
1982+
],
1983+
vec!["physical_plan",
1984+
"ProjectionExec: expr=[c1@0 as c1]\
1985+
\n CoalesceBatchesExec: target_batch_size=4096\
1986+
\n FilterExec: CAST(c2@1 AS Int64) > 10\
1987+
\n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES)\
1988+
\n CsvExec: source=Path(ARROW_TEST_DATA/csv/aggregate_test_100.csv: [ARROW_TEST_DATA/csv/aggregate_test_100.csv]), has_header=true\
1989+
\n"
19801990
]];
19811991
assert_eq!(expected, actual);
19821992

19831993
// Also, expect same result with lowercase explain
19841994
let sql = "explain SELECT c1 FROM aggregate_test_100 where c2 > 10";
19851995
let actual = execute(&mut ctx, sql).await;
1996+
let actual = normalize_vec_for_explain(actual);
19861997
assert_eq!(expected, actual);
19871998
}
19881999

@@ -3921,3 +3932,27 @@ async fn test_aggregation_with_bad_arguments() -> Result<()> {
39213932
assert_eq!(err.to_string(), "Error during planning: Invalid or wrong number of arguments passed to aggregate: 'COUNT(DISTINCT )'");
39223933
Ok(())
39233934
}
3935+
3936+
// Normalizes parts of an explain plan that vary from run to run (such as path)
3937+
fn normalize_for_explain(s: &str) -> String {
3938+
// Convert things like /Users/alamb/Software/arrow/testing/data/csv/aggregate_test_100.csv
3939+
// to ARROW_TEST_DATA/csv/aggregate_test_100.csv
3940+
let data_path = datafusion::test_util::arrow_test_data();
3941+
let s = s.replace(&data_path, "ARROW_TEST_DATA");
3942+
3943+
// convert things like partitioning=RoundRobinBatch(16)
3944+
// to partitioning=RoundRobinBatch(NUM_CORES)
3945+
let needle = format!("RoundRobinBatch({})", num_cpus::get());
3946+
s.replace(&needle, "RoundRobinBatch(NUM_CORES)")
3947+
}
3948+
3949+
/// Applies normalize_for_explain to every line
3950+
fn normalize_vec_for_explain(v: Vec<Vec<String>>) -> Vec<Vec<String>> {
3951+
v.into_iter()
3952+
.map(|l| {
3953+
l.into_iter()
3954+
.map(|s| normalize_for_explain(&s))
3955+
.collect::<Vec<_>>()
3956+
})
3957+
.collect::<Vec<_>>()
3958+
}

datafusion/tests/user_defined_plan.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,9 @@ async fn topk_plan() -> Result<()> {
163163
let mut ctx = setup_table(make_topk_context()).await?;
164164

165165
let expected = vec![
166-
"| logical_plan after topk | TopK: k=3 |",
167-
"| | Projection: #sales.customer_id, #sales.revenue |",
168-
"| | TableScan: sales projection=Some([0, 1]) |",
166+
"| logical_plan after topk | TopK: k=3 |",
167+
"| | Projection: #sales.customer_id, #sales.revenue |",
168+
"| | TableScan: sales projection=Some([0, 1]) |",
169169
].join("\n");
170170

171171
let explain_query = format!("EXPLAIN VERBOSE {}", QUERY);

0 commit comments

Comments
 (0)