Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 67 additions & 5 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

//! This module provides a builder for creating LogicalPlans

use std::{collections::HashMap, sync::Arc};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};

use arrow::{
datatypes::{Schema, SchemaRef},
Expand Down Expand Up @@ -220,10 +223,7 @@ impl LogicalPlanBuilder {
for e in expr {
match e {
Expr::Wildcard => {
(0..input_schema.fields().len()).for_each(|i| {
projected_expr
.push(Expr::Column(input_schema.field(i).qualified_column()))
});
projected_expr.extend(expand_wildcard(input_schema, &self.plan)?)
}
_ => projected_expr
.push(columnize_expr(normalize_col(e, &self.plan)?, input_schema)),
Expand Down Expand Up @@ -508,6 +508,47 @@ pub fn union_with_alias(
})
}

/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s.
pub(crate) fn expand_wildcard(
schema: &DFSchema,
plan: &LogicalPlan,
) -> Result<Vec<Expr>> {
let using_columns = plan.using_columns()?;
let columns_to_skip = using_columns
.into_iter()
// For each USING JOIN condition, only expand to one column in projection
.map(|cols| {
let mut cols = cols.into_iter().collect::<Vec<_>>();
// sort join columns to make sure we consistently keep the same
// qualified column
cols.sort();
cols.into_iter().skip(1)
})
.flatten()
.collect::<HashSet<_>>();

if columns_to_skip.is_empty() {
Ok(schema
.fields()
.iter()
.map(|f| Expr::Column(f.qualified_column()))
.collect::<Vec<Expr>>())
} else {
Ok(schema
.fields()
.iter()
.filter_map(|f| {
let col = f.qualified_column();
if !columns_to_skip.contains(&col) {
Some(Expr::Column(col))
} else {
None
}
})
.collect::<Vec<Expr>>())
}
}

#[cfg(test)]
mod tests {
use arrow::datatypes::{DataType, Field};
Expand Down Expand Up @@ -587,6 +628,27 @@ mod tests {
Ok(())
}

#[test]
fn plan_using_join_wildcard_projection() -> Result<()> {
let t2 = LogicalPlanBuilder::scan_empty(Some("t2"), &employee_schema(), None)?
.build()?;

let plan = LogicalPlanBuilder::scan_empty(Some("t1"), &employee_schema(), None)?
.join_using(&t2, JoinType::Inner, vec!["id"])?
.project(vec![Expr::Wildcard])?
.build()?;

// id column should only show up once in projection
let expected = "Projection: #t1.id, #t1.first_name, #t1.last_name, #t1.state, #t1.salary, #t2.first_name, #t2.last_name, #t2.state, #t2.salary\
\n Join: Using #t1.id = #t2.id\
\n TableScan: t1 projection=None\
\n TableScan: t2 projection=None";

assert_eq!(expected, format!("{:?}", plan));

Ok(())
}

#[test]
fn plan_builder_union_combined_single_union() -> Result<()> {
let plan = LogicalPlanBuilder::scan_empty(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use std::fmt;
use std::sync::Arc;

/// A named reference to a qualified field in a schema.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct Column {
/// relation/table name.
pub relation: Option<String>,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//! Logical query plans can then be optimized and executed directly, or translated into
//! physical query plans and executed.

mod builder;
pub(crate) mod builder;
mod dfschema;
mod display;
mod expr;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use crate::error::DataFusionError;
use crate::logical_plan::dfschema::DFSchemaRef;
use crate::sql::parser::FileType;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use std::collections::HashSet;
use std::{
collections::HashSet,
fmt::{self, Display},
sync::Arc,
};
Expand Down
34 changes: 28 additions & 6 deletions datafusion/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ use crate::datasource::TableProvider;
use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits};
use crate::logical_plan::Expr::Alias;
use crate::logical_plan::{
and, col, lit, normalize_col, union_with_alias, Column, DFSchema, Expr, LogicalPlan,
LogicalPlanBuilder, Operator, PlanType, StringifiedPlan, ToDFSchema,
and, builder::expand_wildcard, col, lit, normalize_col, union_with_alias, Column,
DFSchema, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, StringifiedPlan,
ToDFSchema,
};
use crate::prelude::JoinType;
use crate::scalar::ScalarValue;
Expand Down Expand Up @@ -56,7 +57,7 @@ use sqlparser::parser::ParserError::ParserError;
use super::{
parser::DFParser,
utils::{
can_columns_satisfy_exprs, expand_wildcard, expr_as_column_expr, extract_aliases,
can_columns_satisfy_exprs, expr_as_column_expr, extract_aliases,
find_aggregate_exprs, find_column_exprs, find_window_exprs,
group_window_expr_by_sort_keys, rebase_expr, resolve_aliases_to_exprs,
resolve_positions_to_exprs,
Expand Down Expand Up @@ -687,9 +688,17 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.iter()
.map(|expr| self.sql_select_to_rex(expr, input_schema))
.collect::<Result<Vec<Expr>>>()?
.iter()
.flat_map(|expr| expand_wildcard(expr, input_schema))
.map(|expr| normalize_col(expr, plan))
.into_iter()
.map(|expr| {
Ok(match expr {
Expr::Wildcard => expand_wildcard(input_schema, plan)?,
_ => vec![normalize_col(expr, plan)?],
})
})
.flat_map(|res| match res {
Ok(v) => v.into_iter().map(Ok).collect(),
Err(e) => vec![Err(e)],
})
.collect::<Result<Vec<Expr>>>()
}

Expand Down Expand Up @@ -2773,6 +2782,19 @@ mod tests {
quick_test(sql, expected);
}

#[test]
fn project_wildcard_on_join_with_using() {
let sql = "SELECT * \
FROM lineitem \
JOIN lineitem as lineitem2 \
USING (l_item_id)";
let expected = "Projection: #lineitem.l_item_id, #lineitem.l_description, #lineitem.price, #lineitem2.l_description, #lineitem2.price\
\n Join: Using #lineitem.l_item_id = #lineitem2.l_item_id\
\n TableScan: lineitem projection=None\
\n TableScan: lineitem2 projection=None";
quick_test(sql, expected);
}

#[test]
fn equijoin_explicit_syntax_3_tables() {
let sql = "SELECT id, order_id, l_description \
Expand Down
14 changes: 1 addition & 13 deletions datafusion/src/sql/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,14 @@

//! SQL Utility Functions

use crate::logical_plan::{DFSchema, Expr, LogicalPlan};
use crate::logical_plan::{Expr, LogicalPlan};
use crate::scalar::ScalarValue;
use crate::{
error::{DataFusionError, Result},
logical_plan::{Column, ExpressionVisitor, Recursion},
};
use std::collections::HashMap;

/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s.
pub(crate) fn expand_wildcard(expr: &Expr, schema: &DFSchema) -> Vec<Expr> {
match expr {
Expr::Wildcard => schema
.fields()
.iter()
.map(|f| Expr::Column(f.qualified_column()))
.collect::<Vec<Expr>>(),
_ => vec![expr.clone()],
}
}

/// Collect all deeply nested `Expr::AggregateFunction` and
/// `Expr::AggregateUDF`. They are returned in order of occurrence (depth
/// first), with duplicates omitted.
Expand Down