Skip to content

Commit 7c07e4d

Browse files
authored
Add a checker to confirm physical optimizer rules will keep the physical plan schema immutable (#4316)
* Add a checker to confirm physical optimizer rules will keep the physical plan schema immutable * fix debug print * fix comments
1 parent 209c266 commit 7c07e4d

File tree

7 files changed

+37
-2
lines changed

7 files changed

+37
-2
lines changed

datafusion/core/src/physical_optimizer/aggregate_statistics.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
//! Utilizing exact statistics from sources to avoid scanning data
1919
use std::sync::Arc;
2020

21-
use arrow::datatypes::Schema;
2221
use datafusion_expr::utils::COUNT_STAR_EXPANSION;
2322

2423
use crate::execution::context::SessionConfig;
@@ -85,7 +84,7 @@ impl PhysicalOptimizerRule for AggregateStatistics {
8584
// input can be entirely removed
8685
Ok(Arc::new(ProjectionExec::try_new(
8786
projections,
88-
Arc::new(EmptyExec::new(true, Arc::new(Schema::empty()))),
87+
Arc::new(EmptyExec::new(true, plan.schema())),
8988
)?))
9089
} else {
9190
optimize_children(self, plan, config)
@@ -98,6 +97,11 @@ impl PhysicalOptimizerRule for AggregateStatistics {
9897
fn name(&self) -> &str {
9998
"aggregate_statistics"
10099
}
100+
101+
/// This rule will change the nullable properties of the schema, disable the schema check.
102+
fn schema_check(&self) -> bool {
103+
false
104+
}
101105
}
102106

103107
/// assert if the node passed as argument is a final `AggregateExec` node that can be optimized:

datafusion/core/src/physical_optimizer/coalesce_batches.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,8 @@ impl PhysicalOptimizerRule for CoalesceBatches {
7272
fn name(&self) -> &str {
7373
"coalesce_batches"
7474
}
75+
76+
fn schema_check(&self) -> bool {
77+
true
78+
}
7579
}

datafusion/core/src/physical_optimizer/enforcement.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ impl PhysicalOptimizerRule for BasicEnforcement {
101101
fn name(&self) -> &str {
102102
"BasicEnforcement"
103103
}
104+
105+
fn schema_check(&self) -> bool {
106+
true
107+
}
104108
}
105109

106110
/// When the physical planner creates the Joins, the ordering of join keys is from the original query.

datafusion/core/src/physical_optimizer/hash_build_probe_order.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,10 @@ impl PhysicalOptimizerRule for HashBuildProbeOrder {
211211
fn name(&self) -> &str {
212212
"hash_build_probe_order"
213213
}
214+
215+
fn schema_check(&self) -> bool {
216+
true
217+
}
214218
}
215219

216220
#[cfg(test)]

datafusion/core/src/physical_optimizer/optimizer.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,10 @@ pub trait PhysicalOptimizerRule {
3636

3737
/// A human readable name for this optimizer rule
3838
fn name(&self) -> &str;
39+
40+
/// A flag to indicate whether the physical planner should valid the rule will not
41+
/// change the schema of the plan after the rewriting.
42+
/// Some of the optimization rules might change the nullable properties of the schema
43+
/// and should disable the schema check.
44+
fn schema_check(&self) -> bool;
3945
}

datafusion/core/src/physical_optimizer/repartition.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,10 @@ impl PhysicalOptimizerRule for Repartition {
233233
fn name(&self) -> &str {
234234
"repartition"
235235
}
236+
237+
fn schema_check(&self) -> bool {
238+
true
239+
}
236240
}
237241
#[cfg(test)]
238242
mod tests {

datafusion/core/src/physical_plan/planner.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1679,7 +1679,16 @@ impl DefaultPhysicalPlanner {
16791679

16801680
let mut new_plan = plan;
16811681
for optimizer in optimizers {
1682+
let before_schema = new_plan.schema();
16821683
new_plan = optimizer.optimize(new_plan, &session_state.config)?;
1684+
if optimizer.schema_check() && new_plan.schema() != before_schema {
1685+
return Err(DataFusionError::Internal(format!(
1686+
"PhysicalOptimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}",
1687+
optimizer.name(),
1688+
before_schema,
1689+
new_plan.schema()
1690+
)));
1691+
}
16831692
observer(new_plan.as_ref(), optimizer.as_ref())
16841693
}
16851694
debug!(

0 commit comments

Comments
 (0)