Skip to content

Commit 75a376f

Browse files
author
QP Hou
authored
dedup using join column in wildcard expansion (#678)
* dedup using join column in wildcard expansion * reuse expand_wildcard in logical plan builder
1 parent 9ba22ef commit 75a376f

File tree

6 files changed

+99
-27
lines changed

6 files changed

+99
-27
lines changed

datafusion/src/logical_plan/builder.rs

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717

1818
//! This module provides a builder for creating LogicalPlans
1919
20-
use std::{collections::HashMap, sync::Arc};
20+
use std::{
21+
collections::{HashMap, HashSet},
22+
sync::Arc,
23+
};
2124

2225
use arrow::{
2326
datatypes::{Schema, SchemaRef},
@@ -220,10 +223,7 @@ impl LogicalPlanBuilder {
220223
for e in expr {
221224
match e {
222225
Expr::Wildcard => {
223-
(0..input_schema.fields().len()).for_each(|i| {
224-
projected_expr
225-
.push(Expr::Column(input_schema.field(i).qualified_column()))
226-
});
226+
projected_expr.extend(expand_wildcard(input_schema, &self.plan)?)
227227
}
228228
_ => projected_expr
229229
.push(columnize_expr(normalize_col(e, &self.plan)?, input_schema)),
@@ -508,6 +508,47 @@ pub fn union_with_alias(
508508
})
509509
}
510510

511+
/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s.
512+
pub(crate) fn expand_wildcard(
513+
schema: &DFSchema,
514+
plan: &LogicalPlan,
515+
) -> Result<Vec<Expr>> {
516+
let using_columns = plan.using_columns()?;
517+
let columns_to_skip = using_columns
518+
.into_iter()
519+
// For each USING JOIN condition, only expand to one column in projection
520+
.map(|cols| {
521+
let mut cols = cols.into_iter().collect::<Vec<_>>();
522+
// sort join columns to make sure we consistently keep the same
523+
// qualified column
524+
cols.sort();
525+
cols.into_iter().skip(1)
526+
})
527+
.flatten()
528+
.collect::<HashSet<_>>();
529+
530+
if columns_to_skip.is_empty() {
531+
Ok(schema
532+
.fields()
533+
.iter()
534+
.map(|f| Expr::Column(f.qualified_column()))
535+
.collect::<Vec<Expr>>())
536+
} else {
537+
Ok(schema
538+
.fields()
539+
.iter()
540+
.filter_map(|f| {
541+
let col = f.qualified_column();
542+
if !columns_to_skip.contains(&col) {
543+
Some(Expr::Column(col))
544+
} else {
545+
None
546+
}
547+
})
548+
.collect::<Vec<Expr>>())
549+
}
550+
}
551+
511552
#[cfg(test)]
512553
mod tests {
513554
use arrow::datatypes::{DataType, Field};
@@ -587,6 +628,27 @@ mod tests {
587628
Ok(())
588629
}
589630

631+
#[test]
632+
fn plan_using_join_wildcard_projection() -> Result<()> {
633+
let t2 = LogicalPlanBuilder::scan_empty(Some("t2"), &employee_schema(), None)?
634+
.build()?;
635+
636+
let plan = LogicalPlanBuilder::scan_empty(Some("t1"), &employee_schema(), None)?
637+
.join_using(&t2, JoinType::Inner, vec!["id"])?
638+
.project(vec![Expr::Wildcard])?
639+
.build()?;
640+
641+
// id column should only show up once in projection
642+
let expected = "Projection: #t1.id, #t1.first_name, #t1.last_name, #t1.state, #t1.salary, #t2.first_name, #t2.last_name, #t2.state, #t2.salary\
643+
\n Join: Using #t1.id = #t2.id\
644+
\n TableScan: t1 projection=None\
645+
\n TableScan: t2 projection=None";
646+
647+
assert_eq!(expected, format!("{:?}", plan));
648+
649+
Ok(())
650+
}
651+
590652
#[test]
591653
fn plan_builder_union_combined_single_union() -> Result<()> {
592654
let plan = LogicalPlanBuilder::scan_empty(

datafusion/src/logical_plan/expr.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use std::fmt;
3434
use std::sync::Arc;
3535

3636
/// A named reference to a qualified field in a schema.
37-
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37+
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
3838
pub struct Column {
3939
/// relation/table name.
4040
pub relation: Option<String>,

datafusion/src/logical_plan/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
//! Logical query plans can then be optimized and executed directly, or translated into
2222
//! physical query plans and executed.
2323
24-
mod builder;
24+
pub(crate) mod builder;
2525
mod dfschema;
2626
mod display;
2727
mod expr;

datafusion/src/logical_plan/plan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ use crate::error::DataFusionError;
2525
use crate::logical_plan::dfschema::DFSchemaRef;
2626
use crate::sql::parser::FileType;
2727
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
28-
use std::collections::HashSet;
2928
use std::{
29+
collections::HashSet,
3030
fmt::{self, Display},
3131
sync::Arc,
3232
};

datafusion/src/sql/planner.rs

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ use crate::datasource::TableProvider;
2727
use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits};
2828
use crate::logical_plan::Expr::Alias;
2929
use crate::logical_plan::{
30-
and, col, lit, normalize_col, union_with_alias, Column, DFSchema, Expr, LogicalPlan,
31-
LogicalPlanBuilder, Operator, PlanType, StringifiedPlan, ToDFSchema,
30+
and, builder::expand_wildcard, col, lit, normalize_col, union_with_alias, Column,
31+
DFSchema, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, StringifiedPlan,
32+
ToDFSchema,
3233
};
3334
use crate::prelude::JoinType;
3435
use crate::scalar::ScalarValue;
@@ -56,7 +57,7 @@ use sqlparser::parser::ParserError::ParserError;
5657
use super::{
5758
parser::DFParser,
5859
utils::{
59-
can_columns_satisfy_exprs, expand_wildcard, expr_as_column_expr, extract_aliases,
60+
can_columns_satisfy_exprs, expr_as_column_expr, extract_aliases,
6061
find_aggregate_exprs, find_column_exprs, find_window_exprs,
6162
group_window_expr_by_sort_keys, rebase_expr, resolve_aliases_to_exprs,
6263
resolve_positions_to_exprs,
@@ -687,9 +688,17 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
687688
.iter()
688689
.map(|expr| self.sql_select_to_rex(expr, input_schema))
689690
.collect::<Result<Vec<Expr>>>()?
690-
.iter()
691-
.flat_map(|expr| expand_wildcard(expr, input_schema))
692-
.map(|expr| normalize_col(expr, plan))
691+
.into_iter()
692+
.map(|expr| {
693+
Ok(match expr {
694+
Expr::Wildcard => expand_wildcard(input_schema, plan)?,
695+
_ => vec![normalize_col(expr, plan)?],
696+
})
697+
})
698+
.flat_map(|res| match res {
699+
Ok(v) => v.into_iter().map(Ok).collect(),
700+
Err(e) => vec![Err(e)],
701+
})
693702
.collect::<Result<Vec<Expr>>>()
694703
}
695704

@@ -2773,6 +2782,19 @@ mod tests {
27732782
quick_test(sql, expected);
27742783
}
27752784

2785+
#[test]
2786+
fn project_wildcard_on_join_with_using() {
2787+
let sql = "SELECT * \
2788+
FROM lineitem \
2789+
JOIN lineitem as lineitem2 \
2790+
USING (l_item_id)";
2791+
let expected = "Projection: #lineitem.l_item_id, #lineitem.l_description, #lineitem.price, #lineitem2.l_description, #lineitem2.price\
2792+
\n Join: Using #lineitem.l_item_id = #lineitem2.l_item_id\
2793+
\n TableScan: lineitem projection=None\
2794+
\n TableScan: lineitem2 projection=None";
2795+
quick_test(sql, expected);
2796+
}
2797+
27762798
#[test]
27772799
fn equijoin_explicit_syntax_3_tables() {
27782800
let sql = "SELECT id, order_id, l_description \

datafusion/src/sql/utils.rs

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,14 @@
1717

1818
//! SQL Utility Functions
1919
20-
use crate::logical_plan::{DFSchema, Expr, LogicalPlan};
20+
use crate::logical_plan::{Expr, LogicalPlan};
2121
use crate::scalar::ScalarValue;
2222
use crate::{
2323
error::{DataFusionError, Result},
2424
logical_plan::{Column, ExpressionVisitor, Recursion},
2525
};
2626
use std::collections::HashMap;
2727

28-
/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s.
29-
pub(crate) fn expand_wildcard(expr: &Expr, schema: &DFSchema) -> Vec<Expr> {
30-
match expr {
31-
Expr::Wildcard => schema
32-
.fields()
33-
.iter()
34-
.map(|f| Expr::Column(f.qualified_column()))
35-
.collect::<Vec<Expr>>(),
36-
_ => vec![expr.clone()],
37-
}
38-
}
39-
4028
/// Collect all deeply nested `Expr::AggregateFunction` and
4129
/// `Expr::AggregateUDF`. They are returned in order of occurrence (depth
4230
/// first), with duplicates omitted.

0 commit comments

Comments
 (0)