Skip to content

Commit 02c8247

Browse files
authored
fix: incorrect NATURAL/USING JOIN schema (#14102)
* fix: incorrect NATURAL/USING JOIN schema * Add test * Simplify exclude_using_columns * Add more tests
1 parent 0a2f09f commit 02c8247

File tree

2 files changed

+109
-36
lines changed

2 files changed

+109
-36
lines changed

datafusion/expr/src/utils.rs

Lines changed: 35 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ use datafusion_common::tree_node::{
3535
};
3636
use datafusion_common::utils::get_at_indices;
3737
use datafusion_common::{
38-
internal_err, plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef,
39-
DataFusionError, HashMap, Result, TableReference,
38+
internal_err, plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, HashMap,
39+
Result, TableReference,
4040
};
4141

4242
use indexmap::IndexSet;
@@ -379,14 +379,12 @@ fn get_exprs_except_skipped(
379379
}
380380
}
381381

382-
/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s.
383-
pub fn expand_wildcard(
384-
schema: &DFSchema,
385-
plan: &LogicalPlan,
386-
wildcard_options: Option<&WildcardOptions>,
387-
) -> Result<Vec<Expr>> {
382+
/// For each column specified in the USING JOIN condition, the JOIN plan outputs it twice
383+
/// (once for each join side), but an unqualified wildcard should include it only once.
384+
/// This function returns the columns that should be excluded.
385+
fn exclude_using_columns(plan: &LogicalPlan) -> Result<HashSet<Column>> {
388386
let using_columns = plan.using_columns()?;
389-
let mut columns_to_skip = using_columns
387+
let excluded = using_columns
390388
.into_iter()
391389
// For each USING JOIN condition, only expand to one of each join column in projection
392390
.flat_map(|cols| {
@@ -395,18 +393,26 @@ pub fn expand_wildcard(
395393
// qualified column
396394
cols.sort();
397395
let mut out_column_names: HashSet<String> = HashSet::new();
398-
cols.into_iter()
399-
.filter_map(|c| {
400-
if out_column_names.contains(&c.name) {
401-
Some(c)
402-
} else {
403-
out_column_names.insert(c.name);
404-
None
405-
}
406-
})
407-
.collect::<Vec<_>>()
396+
cols.into_iter().filter_map(move |c| {
397+
if out_column_names.contains(&c.name) {
398+
Some(c)
399+
} else {
400+
out_column_names.insert(c.name);
401+
None
402+
}
403+
})
408404
})
409405
.collect::<HashSet<_>>();
406+
Ok(excluded)
407+
}
408+
409+
/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s.
410+
pub fn expand_wildcard(
411+
schema: &DFSchema,
412+
plan: &LogicalPlan,
413+
wildcard_options: Option<&WildcardOptions>,
414+
) -> Result<Vec<Expr>> {
415+
let mut columns_to_skip = exclude_using_columns(plan)?;
410416
let excluded_columns = if let Some(WildcardOptions {
411417
exclude: opt_exclude,
412418
except: opt_except,
@@ -705,27 +711,20 @@ pub fn exprlist_to_fields<'a>(
705711
.map(|e| match e {
706712
Expr::Wildcard { qualifier, options } => match qualifier {
707713
None => {
708-
let excluded: Vec<String> = get_excluded_columns(
714+
let mut excluded = exclude_using_columns(plan)?;
715+
excluded.extend(get_excluded_columns(
709716
options.exclude.as_ref(),
710717
options.except.as_ref(),
711718
wildcard_schema,
712719
None,
713-
)?
714-
.into_iter()
715-
.map(|c| c.flat_name())
716-
.collect();
717-
Ok::<_, DataFusionError>(
718-
wildcard_schema
719-
.field_names()
720-
.iter()
721-
.enumerate()
722-
.filter(|(_, s)| !excluded.contains(s))
723-
.map(|(i, _)| wildcard_schema.qualified_field(i))
724-
.map(|(qualifier, f)| {
725-
(qualifier.cloned(), Arc::new(f.to_owned()))
726-
})
727-
.collect::<Vec<_>>(),
728-
)
720+
)?);
721+
Ok(wildcard_schema
722+
.iter()
723+
.filter(|(q, f)| {
724+
!excluded.contains(&Column::new(q.cloned(), f.name()))
725+
})
726+
.map(|(q, f)| (q.cloned(), Arc::clone(f)))
727+
.collect::<Vec<_>>())
729728
}
730729
Some(qualifier) => {
731730
let excluded: Vec<String> = get_excluded_columns(

datafusion/sql/tests/sql_integration.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4549,3 +4549,77 @@ fn test_error_message_invalid_window_aggregate_function_signature() {
45494549
"Error during planning: sum does not support zero arguments",
45504550
);
45514551
}
4552+
4553+
// Test issue: https://github.com/apache/datafusion/issues/14058
4554+
// Select with wildcard over a USING/NATURAL JOIN should deduplicate condition columns.
4555+
#[test]
4556+
fn test_using_join_wildcard_schema() {
4557+
let sql = "SELECT * FROM orders o1 JOIN orders o2 USING (order_id)";
4558+
let plan = logical_plan(sql).unwrap();
4559+
let count = plan
4560+
.schema()
4561+
.iter()
4562+
.filter(|(_, f)| f.name() == "order_id")
4563+
.count();
4564+
// Only one order_id column
4565+
assert_eq!(count, 1);
4566+
4567+
let sql = "SELECT * FROM orders o1 NATURAL JOIN orders o2";
4568+
let plan = logical_plan(sql).unwrap();
4569+
// Only columns from one join side should be present
4570+
let expected_fields = vec![
4571+
"o1.order_id".to_string(),
4572+
"o1.customer_id".to_string(),
4573+
"o1.o_item_id".to_string(),
4574+
"o1.qty".to_string(),
4575+
"o1.price".to_string(),
4576+
"o1.delivered".to_string(),
4577+
];
4578+
assert_eq!(plan.schema().field_names(), expected_fields);
4579+
4580+
// Reproducible example of issue #14058
4581+
let sql = "WITH t1 AS (SELECT 1 AS id, 'a' AS value1),
4582+
t2 AS (SELECT 1 AS id, 'x' AS value2)
4583+
SELECT * FROM t1 NATURAL JOIN t2";
4584+
let plan = logical_plan(sql).unwrap();
4585+
assert_eq!(
4586+
plan.schema().field_names(),
4587+
[
4588+
"t1.id".to_string(),
4589+
"t1.value1".to_string(),
4590+
"t2.value2".to_string()
4591+
]
4592+
);
4593+
4594+
// Multiple joins
4595+
let sql = "WITH t1 AS (SELECT 1 AS a, 1 AS b),
4596+
t2 AS (SELECT 1 AS a, 2 AS c),
4597+
t3 AS (SELECT 1 AS c, 2 AS d)
4598+
SELECT * FROM t1 NATURAL JOIN t2 RIGHT JOIN t3 USING (c)";
4599+
let plan = logical_plan(sql).unwrap();
4600+
assert_eq!(
4601+
plan.schema().field_names(),
4602+
[
4603+
"t1.a".to_string(),
4604+
"t1.b".to_string(),
4605+
"t2.c".to_string(),
4606+
"t3.d".to_string()
4607+
]
4608+
);
4609+
4610+
// Subquery
4611+
let sql = "WITH t1 AS (SELECT 1 AS a, 1 AS b),
4612+
t2 AS (SELECT 1 AS a, 2 AS c),
4613+
t3 AS (SELECT 1 AS c, 2 AS d)
4614+
SELECT * FROM (SELECT * FROM t1 LEFT JOIN t2 USING(a)) NATURAL JOIN t3";
4615+
let plan = logical_plan(sql).unwrap();
4616+
assert_eq!(
4617+
plan.schema().field_names(),
4618+
[
4619+
"t1.a".to_string(),
4620+
"t1.b".to_string(),
4621+
"t2.c".to_string(),
4622+
"t3.d".to_string()
4623+
]
4624+
);
4625+
}

0 commit comments

Comments
 (0)