Skip to content

Commit 6911435

Browse files
committed
dedup using join column in wildcard expansion
1 parent e97b86a commit 6911435

File tree

5 files changed

+110
-15
lines changed

5 files changed

+110
-15
lines changed

datafusion/src/logical_plan/builder.rs

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717

1818
//! This module provides a builder for creating LogicalPlans
1919
20-
use std::{collections::HashMap, sync::Arc};
20+
use std::{
21+
collections::{HashMap, HashSet},
22+
sync::Arc,
23+
};
2124

2225
use arrow::{
2326
datatypes::{Schema, SchemaRef},
@@ -220,10 +223,33 @@ impl LogicalPlanBuilder {
220223
for e in expr {
221224
match e {
222225
Expr::Wildcard => {
223-
(0..input_schema.fields().len()).for_each(|i| {
224-
projected_expr
225-
.push(Expr::Column(input_schema.field(i).qualified_column()))
226-
});
226+
let columns_to_skip = self
227+
.plan
228+
.using_columns()?
229+
.into_iter()
230+
// For each USING JOIN condition, only expand to one column in projection
231+
.map(|cols| {
232+
let mut cols = cols.into_iter().collect::<Vec<_>>();
233+
// sort join columns to make sure we consistently keep the same
234+
// qualified column
235+
cols.sort();
236+
cols.into_iter().skip(1)
237+
})
238+
.flatten()
239+
.collect::<HashSet<_>>();
240+
241+
if columns_to_skip.is_empty() {
242+
input_schema.fields().iter().for_each(|f| {
243+
projected_expr.push(Expr::Column(f.qualified_column()))
244+
})
245+
} else {
246+
input_schema.fields().iter().for_each(|f| {
247+
let col = f.qualified_column();
248+
if !columns_to_skip.contains(&col) {
249+
projected_expr.push(Expr::Column(col))
250+
}
251+
})
252+
}
227253
}
228254
_ => projected_expr
229255
.push(columnize_expr(normalize_col(e, &self.plan)?, input_schema)),
@@ -587,6 +613,27 @@ mod tests {
587613
Ok(())
588614
}
589615

616+
#[test]
617+
fn plan_using_join_wildcard_projection() -> Result<()> {
618+
let t2 = LogicalPlanBuilder::scan_empty(Some("t2"), &employee_schema(), None)?
619+
.build()?;
620+
621+
let plan = LogicalPlanBuilder::scan_empty(Some("t1"), &employee_schema(), None)?
622+
.join_using(&t2, JoinType::Inner, vec!["id"])?
623+
.project(vec![Expr::Wildcard])?
624+
.build()?;
625+
626+
// id column should only show up once in projection
627+
let expected = "Projection: #t1.id, #t1.first_name, #t1.last_name, #t1.state, #t1.salary, #t2.first_name, #t2.last_name, #t2.state, #t2.salary\
628+
\n Join: Using #t1.id = #t2.id\
629+
\n TableScan: t1 projection=None\
630+
\n TableScan: t2 projection=None";
631+
632+
assert_eq!(expected, format!("{:?}", plan));
633+
634+
Ok(())
635+
}
636+
590637
#[test]
591638
fn plan_builder_union_combined_single_union() -> Result<()> {
592639
let plan = LogicalPlanBuilder::scan_empty(

datafusion/src/logical_plan/expr.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use std::fmt;
3434
use std::sync::Arc;
3535

3636
/// A named reference to a qualified field in a schema.
37-
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37+
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
3838
pub struct Column {
3939
/// relation/table name.
4040
pub relation: Option<String>,

datafusion/src/logical_plan/plan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ use crate::error::DataFusionError;
2525
use crate::logical_plan::dfschema::DFSchemaRef;
2626
use crate::sql::parser::FileType;
2727
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
28-
use std::collections::HashSet;
2928
use std::{
29+
collections::HashSet,
3030
fmt::{self, Display},
3131
sync::Arc,
3232
};

datafusion/src/sql/planner.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -663,13 +663,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
663663
projection: &[SelectItem],
664664
) -> Result<Vec<Expr>> {
665665
let input_schema = plan.schema();
666+
let using_columns = plan.using_columns()?;
666667

667668
projection
668669
.iter()
669670
.map(|expr| self.sql_select_to_rex(expr, input_schema))
670671
.collect::<Result<Vec<Expr>>>()?
671672
.iter()
672-
.flat_map(|expr| expand_wildcard(expr, input_schema))
673+
.flat_map(|expr| expand_wildcard(expr, input_schema, &using_columns))
673674
.map(|expr| normalize_col(expr, plan))
674675
.collect::<Result<Vec<Expr>>>()
675676
}
@@ -2738,6 +2739,19 @@ mod tests {
27382739
quick_test(sql, expected);
27392740
}
27402741

2742+
#[test]
2743+
fn project_wildcard_on_join_with_using() {
2744+
let sql = "SELECT * \
2745+
FROM lineitem \
2746+
JOIN lineitem as lineitem2 \
2747+
USING (l_item_id)";
2748+
let expected = "Projection: #lineitem.l_item_id, #lineitem.l_description, #lineitem.price, #lineitem2.l_description, #lineitem2.price\
2749+
\n Join: Using #lineitem.l_item_id = #lineitem2.l_item_id\
2750+
\n TableScan: lineitem projection=None\
2751+
\n TableScan: lineitem2 projection=None";
2752+
quick_test(sql, expected);
2753+
}
2754+
27412755
#[test]
27422756
fn equijoin_explicit_syntax_3_tables() {
27432757
let sql = "SELECT id, order_id, l_description \

datafusion/src/sql/utils.rs

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,50 @@ use crate::{
2323
error::{DataFusionError, Result},
2424
logical_plan::{Column, ExpressionVisitor, Recursion},
2525
};
26-
use std::collections::HashMap;
26+
use std::collections::{HashMap, HashSet};
2727

2828
/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s.
29-
pub(crate) fn expand_wildcard(expr: &Expr, schema: &DFSchema) -> Vec<Expr> {
29+
pub(crate) fn expand_wildcard(
30+
expr: &Expr,
31+
schema: &DFSchema,
32+
using_columns: &[HashSet<Column>],
33+
) -> Vec<Expr> {
3034
match expr {
31-
Expr::Wildcard => schema
32-
.fields()
33-
.iter()
34-
.map(|f| Expr::Column(f.qualified_column()))
35-
.collect::<Vec<Expr>>(),
35+
Expr::Wildcard => {
36+
let columns_to_skip = using_columns
37+
.iter()
38+
// For each USING JOIN condition, only expand to one column in projection
39+
.map(|cols| {
40+
let mut cols = cols.iter().collect::<Vec<_>>();
41+
// sort join columns to make sure we consistently keep the same
42+
// qualified column
43+
cols.sort();
44+
cols.into_iter().skip(1)
45+
})
46+
.flatten()
47+
.collect::<HashSet<_>>();
48+
49+
if columns_to_skip.is_empty() {
50+
schema
51+
.fields()
52+
.iter()
53+
.map(|f| Expr::Column(f.qualified_column()))
54+
.collect::<Vec<Expr>>()
55+
} else {
56+
schema
57+
.fields()
58+
.iter()
59+
.filter_map(|f| {
60+
let col = f.qualified_column();
61+
if !columns_to_skip.contains(&col) {
62+
Some(Expr::Column(col))
63+
} else {
64+
None
65+
}
66+
})
67+
.collect::<Vec<Expr>>()
68+
}
69+
}
3670
_ => vec![expr.clone()],
3771
}
3872
}

0 commit comments

Comments
 (0)