Skip to content

Commit bb09947

Browse files
committed
Temporary cherry pick of apache#16064
1 parent 11f5af5 commit bb09947

File tree

4 files changed

+390
-22
lines changed

4 files changed

+390
-22
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ use arrow::array::{builder::StringBuilder, RecordBatch};
6262
use arrow::compute::SortOptions;
6363
use arrow::datatypes::{Schema, SchemaRef};
6464
use datafusion_common::display::ToStringifiedPlan;
65-
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
65+
use datafusion_common::tree_node::{
66+
Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeVisitor,
67+
};
6668
use datafusion_common::{
6769
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema,
6870
ScalarValue,
@@ -2075,29 +2077,37 @@ fn maybe_fix_physical_column_name(
20752077
expr: Result<Arc<dyn PhysicalExpr>>,
20762078
input_physical_schema: &SchemaRef,
20772079
) -> Result<Arc<dyn PhysicalExpr>> {
2078-
if let Ok(e) = &expr {
2079-
if let Some(column) = e.as_any().downcast_ref::<Column>() {
2080-
let physical_field = input_physical_schema.field(column.index());
2081-
let expr_col_name = column.name();
2082-
let physical_name = physical_field.name();
2083-
2084-
if physical_name != expr_col_name {
2085-
// handle edge cases where the physical_name contains ':'.
2086-
let colon_count = physical_name.matches(':').count();
2087-
let mut splits = expr_col_name.match_indices(':');
2088-
let split_pos = splits.nth(colon_count);
2089-
2090-
if let Some((idx, _)) = split_pos {
2091-
let base_name = &expr_col_name[..idx];
2092-
if base_name == physical_name {
2093-
let updated_column = Column::new(physical_name, column.index());
2094-
return Ok(Arc::new(updated_column));
2080+
expr.and_then(|e| {
2081+
e.transform_down(|node| {
2082+
if let Some(column) = node.as_any().downcast_ref::<Column>() {
2083+
let idx = column.index();
2084+
let physical_field = input_physical_schema.field(idx);
2085+
let expr_col_name = column.name();
2086+
let physical_name = physical_field.name();
2087+
2088+
if expr_col_name != physical_name {
2089+
// handle edge cases where the physical_name contains ':'.
2090+
let colon_count = physical_name.matches(':').count();
2091+
let mut splits = expr_col_name.match_indices(':');
2092+
let split_pos = splits.nth(colon_count);
2093+
2094+
if let Some((i, _)) = split_pos {
2095+
let base_name = &expr_col_name[..i];
2096+
if base_name == physical_name {
2097+
let updated_column = Column::new(physical_name, idx);
2098+
return Ok(Transformed::yes(Arc::new(updated_column)));
2099+
}
20952100
}
20962101
}
2102+
2103+
// If names already match or fix is not possible, just leave it as it is
2104+
Ok(Transformed::no(node))
2105+
} else {
2106+
Ok(Transformed::no(node))
20972107
}
2098-
}
2099-
}
2100-
expr
2108+
})
2109+
.data()
2110+
})
21012111
}
21022112

21032113
struct OptimizationInvariantChecker<'a> {

datafusion/physical-plan/src/union.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -513,7 +513,12 @@ fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> SchemaRef {
513513

514514
let fields = (0..first_schema.fields().len())
515515
.map(|i| {
516-
inputs
516+
// We take the name from the left side of the union to match how names are coerced during logical planning,
517+
// which also uses the left side names.
518+
let base_field = first_schema.field(i).clone();
519+
520+
// Coerce metadata and nullability across all inputs
521+
let merged_field = inputs
517522
.iter()
518523
.enumerate()
519524
.map(|(input_idx, input)| {
@@ -535,6 +540,9 @@ fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> SchemaRef {
535540
// We can unwrap this because if inputs was empty, this would've already panic'ed when we
536541
// indexed into inputs[0].
537542
.unwrap()
543+
.with_name(base_field.name());
544+
545+
merged_field
538546
})
539547
.collect::<Vec<_>>();
540548

datafusion/substrait/tests/cases/consumer_integration.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,4 +560,26 @@ mod tests {
560560
);
561561
Ok(())
562562
}
563+
564+
#[tokio::test]
565+
async fn test_multiple_unions() -> Result<()> {
566+
let plan_str = test_plan_to_string("multiple_unions.json").await?;
567+
assert_snapshot!(
568+
plan_str,
569+
@"Projection: Utf8(\"people\") AS product_category, Utf8(\"people\")__temp__0 AS product_type, product_key\
570+
\n Union\
571+
\n Projection: Utf8(\"people\"), Utf8(\"people\") AS Utf8(\"people\")__temp__0, sales.product_key\
572+
\n Left Join: sales.product_key = food.@food_id\
573+
\n TableScan: sales\
574+
\n TableScan: food\
575+
\n Union\
576+
\n Projection: people.$f3, people.$f5, people.product_key0\
577+
\n Left Join: people.product_key0 = food.@food_id\
578+
\n TableScan: people\
579+
\n TableScan: food\
580+
\n TableScan: more_products"
581+
);
582+
583+
Ok(())
584+
}
563585
}

0 commit comments

Comments
 (0)