Skip to content

Commit ec15558

Browse files
authored
Fix duplicate field name error in Join::try_new_with_project_input during physical planning (#16454)
* Fix duplicates on Join creation during physcial planning * Add Substrait reproducer * Better error message & more doc * Handle case for right/left/full joins as well
1 parent e162ec5 commit ec15558

File tree

10 files changed

+1033
-60
lines changed

10 files changed

+1033
-60
lines changed

datafusion/common/src/dfschema.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,25 @@ impl DFSchema {
206206
Ok(dfschema)
207207
}
208208

209+
/// Return the same schema, where all fields have a given qualifier.
210+
pub fn with_field_specific_qualified_schema(
211+
&self,
212+
qualifiers: Vec<Option<TableReference>>,
213+
) -> Result<Self> {
214+
if qualifiers.len() != self.fields().len() {
215+
return _plan_err!(
216+
"Number of qualifiers must match number of fields. Expected {}, got {}",
217+
self.fields().len(),
218+
qualifiers.len()
219+
);
220+
}
221+
Ok(DFSchema {
222+
inner: Arc::clone(&self.inner),
223+
field_qualifiers: qualifiers,
224+
functional_dependencies: self.functional_dependencies.clone(),
225+
})
226+
}
227+
209228
/// Check if the schema have some fields with the same name
210229
pub fn check_names(&self) -> Result<()> {
211230
let mut qualified_names = BTreeSet::new();

datafusion/core/src/physical_planner.rs

Lines changed: 85 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ use datafusion_physical_plan::execution_plan::InvariantLevel;
9393
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
9494
use datafusion_physical_plan::recursive_query::RecursiveQueryExec;
9595
use datafusion_physical_plan::unnest::ListUnnest;
96+
use datafusion_sql::TableReference;
9697
use sqlparser::ast::NullTreatment;
9798

9899
use async_trait::async_trait;
@@ -939,8 +940,8 @@ impl DefaultPhysicalPlanner {
939940

940941
// 2 Children
941942
LogicalPlan::Join(Join {
942-
left,
943-
right,
943+
left: original_left,
944+
right: original_right,
944945
on: keys,
945946
filter,
946947
join_type,
@@ -963,23 +964,25 @@ impl DefaultPhysicalPlanner {
963964
let (left, left_col_keys, left_projected) =
964965
wrap_projection_for_join_if_necessary(
965966
&left_keys,
966-
left.as_ref().clone(),
967+
original_left.as_ref().clone(),
967968
)?;
968969
let (right, right_col_keys, right_projected) =
969970
wrap_projection_for_join_if_necessary(
970971
&right_keys,
971-
right.as_ref().clone(),
972+
original_right.as_ref().clone(),
972973
)?;
973974
let column_on = (left_col_keys, right_col_keys);
974975

975976
let left = Arc::new(left);
976977
let right = Arc::new(right);
977-
let new_join = LogicalPlan::Join(Join::try_new_with_project_input(
978+
let (new_join, requalified) = Join::try_new_with_project_input(
978979
node,
979980
Arc::clone(&left),
980981
Arc::clone(&right),
981982
column_on,
982-
)?);
983+
)?;
984+
985+
let new_join = LogicalPlan::Join(new_join);
983986

984987
// If inputs were projected then create ExecutionPlan for these new
985988
// LogicalPlan nodes.
@@ -1012,8 +1015,24 @@ impl DefaultPhysicalPlanner {
10121015

10131016
// Remove temporary projected columns
10141017
if left_projected || right_projected {
1015-
let final_join_result =
1016-
join_schema.iter().map(Expr::from).collect::<Vec<_>>();
1018+
// Re-qualify the join schema only if the inputs were previously requalified in
1019+
// `try_new_with_project_input`. This ensures that when building the Projection
1020+
// it can correctly resolve field nullability and data types
1021+
// by disambiguating fields from the left and right sides of the join.
1022+
let qualified_join_schema = if requalified {
1023+
Arc::new(qualify_join_schema_sides(
1024+
join_schema,
1025+
original_left,
1026+
original_right,
1027+
)?)
1028+
} else {
1029+
Arc::clone(join_schema)
1030+
};
1031+
1032+
let final_join_result = qualified_join_schema
1033+
.iter()
1034+
.map(Expr::from)
1035+
.collect::<Vec<_>>();
10171036
let projection = LogicalPlan::Projection(Projection::try_new(
10181037
final_join_result,
10191038
Arc::new(new_join),
@@ -1510,6 +1529,64 @@ fn get_null_physical_expr_pair(
15101529
Ok((Arc::new(null_value), physical_name))
15111530
}
15121531

1532+
/// Qualifies the fields in a join schema with "left" and "right" qualifiers
1533+
/// without mutating the original schema. This function should only be used when
1534+
/// the join inputs have already been requalified earlier in `try_new_with_project_input`.
1535+
///
1536+
/// The purpose is to avoid ambiguity errors later in planning (e.g., in nullability or data type resolution)
1537+
/// when converting expressions to fields.
1538+
fn qualify_join_schema_sides(
1539+
join_schema: &DFSchema,
1540+
left: &LogicalPlan,
1541+
right: &LogicalPlan,
1542+
) -> Result<DFSchema> {
1543+
let left_fields = left.schema().fields();
1544+
let right_fields = right.schema().fields();
1545+
let join_fields = join_schema.fields();
1546+
1547+
// Validate lengths
1548+
if join_fields.len() != left_fields.len() + right_fields.len() {
1549+
return internal_err!(
1550+
"Join schema field count must match left and right field count."
1551+
);
1552+
}
1553+
1554+
// Validate field names match
1555+
for (i, (field, expected)) in join_fields
1556+
.iter()
1557+
.zip(left_fields.iter().chain(right_fields.iter()))
1558+
.enumerate()
1559+
{
1560+
if field.name() != expected.name() {
1561+
return internal_err!(
1562+
"Field name mismatch at index {}: expected '{}', found '{}'",
1563+
i,
1564+
expected.name(),
1565+
field.name()
1566+
);
1567+
}
1568+
}
1569+
1570+
// qualify sides
1571+
let qualifiers = join_fields
1572+
.iter()
1573+
.enumerate()
1574+
.map(|(i, _)| {
1575+
if i < left_fields.len() {
1576+
Some(TableReference::Bare {
1577+
table: Arc::from("left"),
1578+
})
1579+
} else {
1580+
Some(TableReference::Bare {
1581+
table: Arc::from("right"),
1582+
})
1583+
}
1584+
})
1585+
.collect();
1586+
1587+
join_schema.with_field_specific_qualified_schema(qualifiers)
1588+
}
1589+
15131590
fn get_physical_expr_pair(
15141591
expr: &Expr,
15151592
input_dfschema: &DFSchema,

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1675,6 +1675,38 @@ pub fn build_join_schema(
16751675
dfschema.with_functional_dependencies(func_dependencies)
16761676
}
16771677

1678+
/// (Re)qualify the sides of a join if needed, i.e. if the columns from one side would otherwise
1679+
/// conflict with the columns from the other.
1680+
/// This is especially useful for queries that come as Substrait, since Substrait doesn't currently allow specifying
1681+
/// aliases, neither for columns nor for tables. DataFusion requires columns to be uniquely identifiable, in some
1682+
/// places (see e.g. DFSchema::check_names).
1683+
/// The function returns:
1684+
/// - The requalified or original left logical plan
1685+
/// - The requalified or original right logical plan
1686+
/// - If a requalification was needed or not
1687+
pub fn requalify_sides_if_needed(
1688+
left: LogicalPlanBuilder,
1689+
right: LogicalPlanBuilder,
1690+
) -> Result<(LogicalPlanBuilder, LogicalPlanBuilder, bool)> {
1691+
let left_cols = left.schema().columns();
1692+
let right_cols = right.schema().columns();
1693+
if left_cols.iter().any(|l| {
1694+
right_cols.iter().any(|r| {
1695+
l == r || (l.name == r.name && (l.relation.is_none() || r.relation.is_none()))
1696+
})
1697+
}) {
1698+
// These names have no connection to the original plan, but they'll make the columns
1699+
// (mostly) unique.
1700+
Ok((
1701+
left.alias(TableReference::bare("left"))?,
1702+
right.alias(TableReference::bare("right"))?,
1703+
true,
1704+
))
1705+
} else {
1706+
Ok((left, right, false))
1707+
}
1708+
}
1709+
16781710
/// Add additional "synthetic" group by expressions based on functional
16791711
/// dependencies.
16801712
///

datafusion/expr/src/logical_plan/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ mod statement;
2727
pub mod tree_node;
2828

2929
pub use builder::{
30-
build_join_schema, table_scan, union, wrap_projection_for_join_if_necessary,
31-
LogicalPlanBuilder, LogicalPlanBuilderOptions, LogicalTableSource, UNNAMED_TABLE,
30+
build_join_schema, requalify_sides_if_needed, table_scan, union,
31+
wrap_projection_for_join_if_necessary, LogicalPlanBuilder, LogicalPlanBuilderOptions,
32+
LogicalTableSource, UNNAMED_TABLE,
3233
};
3334
pub use ddl::{
3435
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction,

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,10 @@ use crate::utils::{
4343
grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction,
4444
};
4545
use crate::{
46-
build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Execute,
47-
Expr, ExprSchemable, LogicalPlanBuilder, Operator, Prepare,
48-
TableProviderFilterPushDown, TableSource, WindowFunctionDefinition,
46+
build_join_schema, expr_vec_fmt, requalify_sides_if_needed, BinaryExpr,
47+
CreateMemoryTable, CreateView, Execute, Expr, ExprSchemable, LogicalPlanBuilder,
48+
Operator, Prepare, TableProviderFilterPushDown, TableSource,
49+
WindowFunctionDefinition,
4950
};
5051

5152
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
@@ -3795,37 +3796,61 @@ impl Join {
37953796
})
37963797
}
37973798

3798-
/// Create Join with input which wrapped with projection, this method is used to help create physical join.
3799+
/// Create Join with input which wrapped with projection, this method is used in physcial planning only to help
3800+
/// create the physical join.
37993801
pub fn try_new_with_project_input(
38003802
original: &LogicalPlan,
38013803
left: Arc<LogicalPlan>,
38023804
right: Arc<LogicalPlan>,
38033805
column_on: (Vec<Column>, Vec<Column>),
3804-
) -> Result<Self> {
3806+
) -> Result<(Self, bool)> {
38053807
let original_join = match original {
38063808
LogicalPlan::Join(join) => join,
38073809
_ => return plan_err!("Could not create join with project input"),
38083810
};
38093811

3812+
let mut left_sch = LogicalPlanBuilder::from(Arc::clone(&left));
3813+
let mut right_sch = LogicalPlanBuilder::from(Arc::clone(&right));
3814+
3815+
let mut requalified = false;
3816+
3817+
// By definition, the resulting schema of an inner/left/right & full join will have first the left side fields and then the right,
3818+
// potentially having duplicate field names. Note this will only qualify fields if they have not been qualified before.
3819+
if original_join.join_type == JoinType::Inner
3820+
|| original_join.join_type == JoinType::Left
3821+
|| original_join.join_type == JoinType::Right
3822+
|| original_join.join_type == JoinType::Full
3823+
{
3824+
(left_sch, right_sch, requalified) =
3825+
requalify_sides_if_needed(left_sch.clone(), right_sch.clone())?;
3826+
}
3827+
38103828
let on: Vec<(Expr, Expr)> = column_on
38113829
.0
38123830
.into_iter()
38133831
.zip(column_on.1)
38143832
.map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
38153833
.collect();
3816-
let join_schema =
3817-
build_join_schema(left.schema(), right.schema(), &original_join.join_type)?;
38183834

3819-
Ok(Join {
3820-
left,
3821-
right,
3822-
on,
3823-
filter: original_join.filter.clone(),
3824-
join_type: original_join.join_type,
3825-
join_constraint: original_join.join_constraint,
3826-
schema: Arc::new(join_schema),
3827-
null_equality: original_join.null_equality,
3828-
})
3835+
let join_schema = build_join_schema(
3836+
left_sch.schema(),
3837+
right_sch.schema(),
3838+
&original_join.join_type,
3839+
)?;
3840+
3841+
Ok((
3842+
Join {
3843+
left,
3844+
right,
3845+
on,
3846+
filter: original_join.filter.clone(),
3847+
join_type: original_join.join_type,
3848+
join_constraint: original_join.join_constraint,
3849+
schema: Arc::new(join_schema),
3850+
null_equality: original_join.null_equality,
3851+
},
3852+
requalified,
3853+
))
38293854
}
38303855
}
38313856

datafusion/substrait/src/logical_plan/consumer/rel/cross_rel.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::logical_plan::consumer::utils::requalify_sides_if_needed;
1918
use crate::logical_plan::consumer::SubstraitConsumer;
2019
use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder};
20+
21+
use datafusion::logical_expr::requalify_sides_if_needed;
22+
2123
use substrait::proto::CrossRel;
2224

2325
pub async fn from_cross_rel(
@@ -30,6 +32,6 @@ pub async fn from_cross_rel(
3032
let right = LogicalPlanBuilder::from(
3133
consumer.consume_rel(cross.right.as_ref().unwrap()).await?,
3234
);
33-
let (left, right) = requalify_sides_if_needed(left, right)?;
35+
let (left, right, _requalified) = requalify_sides_if_needed(left, right)?;
3436
left.cross_join(right.build()?)?.build()
3537
}

datafusion/substrait/src/logical_plan/consumer/rel/join_rel.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,14 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::logical_plan::consumer::utils::requalify_sides_if_needed;
1918
use crate::logical_plan::consumer::SubstraitConsumer;
2019
use datafusion::common::{not_impl_err, plan_err, Column, JoinType, NullEquality};
20+
use datafusion::logical_expr::requalify_sides_if_needed;
2121
use datafusion::logical_expr::utils::split_conjunction;
2222
use datafusion::logical_expr::{
2323
BinaryExpr, Expr, LogicalPlan, LogicalPlanBuilder, Operator,
2424
};
25+
2526
use substrait::proto::{join_rel, JoinRel};
2627

2728
pub async fn from_join_rel(
@@ -38,7 +39,7 @@ pub async fn from_join_rel(
3839
let right = LogicalPlanBuilder::from(
3940
consumer.consume_rel(join.right.as_ref().unwrap()).await?,
4041
);
41-
let (left, right) = requalify_sides_if_needed(left, right)?;
42+
let (left, right, _requalified) = requalify_sides_if_needed(left, right)?;
4243

4344
let join_type = from_substrait_jointype(join.r#type)?;
4445
// The join condition expression needs full input schema and not the output schema from join since we lose columns from

0 commit comments

Comments
 (0)