Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions datafusion/core/benches/sql_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,9 +519,6 @@ fn criterion_benchmark(c: &mut Criterion) {
};

let raw_tpcds_sql_queries = (1..100)
// skip query 75 until it is fixed
// https://github.com/apache/datafusion/issues/17801
.filter(|q| *q != 75)
.map(|q| std::fs::read_to_string(format!("{tests_path}tpc-ds/{q}.sql")).unwrap())
.collect::<Vec<_>>();

Expand Down
37 changes: 36 additions & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use crate::schema_equivalence::schema_satisfied_by;
use arrow::array::{builder::StringBuilder, RecordBatch};
use arrow::compute::SortOptions;
use arrow::datatypes::Schema;
use arrow_schema::Field;
use datafusion_catalog::ScanArgs;
use datafusion_common::display::ToStringifiedPlan;
use datafusion_common::format::ExplainAnalyzeLevel;
Expand Down Expand Up @@ -2520,7 +2521,9 @@ impl<'a> OptimizationInvariantChecker<'a> {
previous_schema: &Arc<Schema>,
) -> Result<()> {
// if the rule is not permitted to change the schema, confirm that it did not change.
if self.rule.schema_check() && plan.schema() != *previous_schema {
if self.rule.schema_check()
&& !is_allowed_schema_change(previous_schema.as_ref(), plan.schema().as_ref())
{
internal_err!("PhysicalOptimizer rule '{}' failed. Schema mismatch. Expected original schema: {:?}, got new schema: {:?}",
self.rule.name(),
previous_schema,
Expand All @@ -2536,6 +2539,38 @@ impl<'a> OptimizationInvariantChecker<'a> {
}
}

/// Checks if the change from `old` schema to `new` is allowed or not.
///
/// The current implementation only allows nullability of individual fields to change
/// from 'nullable' to 'not nullable'. This can happen due to physical expressions knowing
/// more about their null-ness than their logical counterparts.
/// This change is allowed because for any field the non-nullable domain `F` is a strict subset
/// of the nullable domain `F ∪ { NULL }`. A physical schema that guarantees a stricter subset
/// of values will not violate any assumptions made based on the less strict schema.
fn is_allowed_schema_change(old: &Schema, new: &Schema) -> bool {
if new.metadata != old.metadata {
return false;
}

if new.fields.len() != old.fields.len() {
return false;
}

let new_fields = new.fields.iter().map(|f| f.as_ref());
let old_fields = old.fields.iter().map(|f| f.as_ref());
old_fields
.zip(new_fields)
.all(|(old, new)| is_allowed_field_change(old, new))
}

fn is_allowed_field_change(old_field: &Field, new_field: &Field) -> bool {
new_field.name() == old_field.name()
&& new_field.data_type() == old_field.data_type()
&& new_field.metadata() == old_field.metadata()
&& (new_field.is_nullable() == old_field.is_nullable()
|| !new_field.is_nullable())
}

impl<'n> TreeNodeVisitor<'n> for OptimizationInvariantChecker<'_> {
type Node = Arc<dyn ExecutionPlan>;

Expand Down
5 changes: 4 additions & 1 deletion datafusion/core/tests/tpcds_planning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1052,9 +1052,12 @@ async fn regression_test(query_no: u8, create_physical: bool) -> Result<()> {
for sql in &sql {
let df = ctx.sql(sql).await?;
let (state, plan) = df.into_parts();
let plan = state.optimize(&plan)?;
if create_physical {
let _ = state.create_physical_plan(&plan).await?;
} else {
// Run the logical optimizer even if we are not creating the physical plan
// to ensure it will properly succeed
let _ = state.optimize(&plan)?;
}
}

Expand Down
Loading