Skip to content

Commit

Permalink
Use standard tree walk in Projection Pushdown
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jan 8, 2024
1 parent cc42894 commit dce26fe
Showing 1 changed file with 29 additions and 118 deletions.
147 changes: 29 additions & 118 deletions datafusion/optimizer/src/optimize_projections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ use datafusion_common::{
use datafusion_expr::expr::{Alias, ScalarFunction, ScalarFunctionDefinition};
use datafusion_expr::{
logical_plan::LogicalPlan, projection_schema, Aggregate, BinaryExpr, Cast, Distinct,
Expr, GroupingSet, Projection, TableScan, Window,
Expr, Projection, TableScan, Window,
};

use datafusion_expr::utils::inspect_expr_pre;
use hashbrown::HashMap;
use itertools::{izip, Itertools};

Expand Down Expand Up @@ -531,7 +532,7 @@ macro_rules! rewrite_expr_with_check {
///
/// - `Ok(Some(Expr))`: Rewrite was successful. Contains the rewritten result.
/// - `Ok(None)`: Signals that `expr` can not be rewritten.
/// - `Err(error)`: An error occured during the function call.
/// - `Err(error)`: An error occurred during the function call.
fn rewrite_expr(expr: &Expr, input: &Projection) -> Result<Option<Expr>> {
let result = match expr {
Expr::Column(col) => {
Expand Down Expand Up @@ -574,129 +575,39 @@ fn rewrite_expr(expr: &Expr, input: &Projection) -> Result<Option<Expr>> {
Ok(Some(result))
}

/// Retrieves a set of outer-referenced columns by the given expression, `expr`.
/// Note that the `Expr::to_columns()` function doesn't return these columns.
///
/// # Parameters
///
/// * `expr` - The expression to analyze for outer-referenced columns.
///
/// # Returns
///
/// returns a `HashSet<Column>` containing all outer-referenced columns.
fn outer_columns(expr: &Expr) -> HashSet<Column> {
let mut columns = HashSet::new();
outer_columns_helper(expr, &mut columns);
columns
}

/// A recursive subroutine that accumulates outer-referenced columns by the
/// Accumulates outer-referenced columns by the
/// given expression, `expr`.
///
/// # Parameters
///
/// * `expr` - The expression to analyze for outer-referenced columns.
/// * `columns` - A mutable reference to a `HashSet<Column>` where detected
/// columns are collected.
fn outer_columns_helper(expr: &Expr, columns: &mut HashSet<Column>) {
match expr {
Expr::OuterReferenceColumn(_, col) => {
columns.insert(col.clone());
}
Expr::BinaryExpr(binary_expr) => {
outer_columns_helper(&binary_expr.left, columns);
outer_columns_helper(&binary_expr.right, columns);
}
Expr::ScalarSubquery(subquery) => {
let exprs = subquery.outer_ref_columns.iter();
outer_columns_helper_multi(exprs, columns);
}
Expr::Exists(exists) => {
let exprs = exists.subquery.outer_ref_columns.iter();
outer_columns_helper_multi(exprs, columns);
}
Expr::Alias(alias) => outer_columns_helper(&alias.expr, columns),
Expr::InSubquery(insubquery) => {
let exprs = insubquery.subquery.outer_ref_columns.iter();
outer_columns_helper_multi(exprs, columns);
}
Expr::Cast(cast) => outer_columns_helper(&cast.expr, columns),
Expr::Sort(sort) => outer_columns_helper(&sort.expr, columns),
Expr::AggregateFunction(aggregate_fn) => {
outer_columns_helper_multi(aggregate_fn.args.iter(), columns);
if let Some(filter) = aggregate_fn.filter.as_ref() {
outer_columns_helper(filter, columns);
fn outer_columns(expr: &Expr, columns: &mut HashSet<Column>) {
// inspect_expr_pre doesn't handle subquery references, so find them explicitly
inspect_expr_pre(expr, |expr| {
match expr {
Expr::OuterReferenceColumn(_, col) => {
columns.insert(col.clone());
}
if let Some(obs) = aggregate_fn.order_by.as_ref() {
outer_columns_helper_multi(obs.iter(), columns);
Expr::ScalarSubquery(subquery) => {
outer_columns_helper_multi(&subquery.outer_ref_columns, columns);
}
}
Expr::WindowFunction(window_fn) => {
outer_columns_helper_multi(window_fn.args.iter(), columns);
outer_columns_helper_multi(window_fn.order_by.iter(), columns);
outer_columns_helper_multi(window_fn.partition_by.iter(), columns);
}
Expr::GroupingSet(groupingset) => match groupingset {
GroupingSet::GroupingSets(multi_exprs) => {
multi_exprs
.iter()
.for_each(|e| outer_columns_helper_multi(e.iter(), columns));
Expr::Exists(exists) => {
outer_columns_helper_multi(&exists.subquery.outer_ref_columns, columns);
}
GroupingSet::Cube(exprs) | GroupingSet::Rollup(exprs) => {
outer_columns_helper_multi(exprs.iter(), columns);
Expr::InSubquery(insubquery) => {
outer_columns_helper_multi(
&insubquery.subquery.outer_ref_columns,
columns,
);
}
},
Expr::ScalarFunction(scalar_fn) => {
outer_columns_helper_multi(scalar_fn.args.iter(), columns);
}
Expr::Like(like) => {
outer_columns_helper(&like.expr, columns);
outer_columns_helper(&like.pattern, columns);
}
Expr::InList(in_list) => {
outer_columns_helper(&in_list.expr, columns);
outer_columns_helper_multi(in_list.list.iter(), columns);
}
Expr::Case(case) => {
let when_then_exprs = case
.when_then_expr
.iter()
.flat_map(|(first, second)| [first.as_ref(), second.as_ref()]);
outer_columns_helper_multi(when_then_exprs, columns);
if let Some(expr) = case.expr.as_ref() {
outer_columns_helper(expr, columns);
}
if let Some(expr) = case.else_expr.as_ref() {
outer_columns_helper(expr, columns);
}
}
Expr::SimilarTo(similar_to) => {
outer_columns_helper(&similar_to.expr, columns);
outer_columns_helper(&similar_to.pattern, columns);
}
Expr::TryCast(try_cast) => outer_columns_helper(&try_cast.expr, columns),
Expr::GetIndexedField(index) => outer_columns_helper(&index.expr, columns),
Expr::Between(between) => {
outer_columns_helper(&between.expr, columns);
outer_columns_helper(&between.low, columns);
outer_columns_helper(&between.high, columns);
}
Expr::Not(expr)
| Expr::IsNotFalse(expr)
| Expr::IsFalse(expr)
| Expr::IsTrue(expr)
| Expr::IsNotTrue(expr)
| Expr::IsUnknown(expr)
| Expr::IsNotUnknown(expr)
| Expr::IsNotNull(expr)
| Expr::IsNull(expr)
| Expr::Negative(expr) => outer_columns_helper(expr, columns),
Expr::Column(_)
| Expr::Literal(_)
| Expr::Wildcard { .. }
| Expr::ScalarVariable { .. }
| Expr::Placeholder(_) => (),
}
_ => {}
};
Ok(()) as Result<()>
})
// unwrap: closure above never returns Err, so can not be Err here
.unwrap();
}

/// A recursive subroutine that accumulates outer-referenced columns by the
Expand All @@ -708,10 +619,10 @@ fn outer_columns_helper(expr: &Expr, columns: &mut HashSet<Column>) {
/// * `columns` - A mutable reference to a `HashSet<Column>` where detected
/// columns are collected.
fn outer_columns_helper_multi<'a>(
exprs: impl Iterator<Item = &'a Expr>,
exprs: impl IntoIterator<Item = &'a Expr>,
columns: &mut HashSet<Column>,
) {
exprs.for_each(|e| outer_columns_helper(e, columns));
exprs.into_iter().for_each(|e| outer_columns(e, columns));
}

/// Generates the required expressions (columns) that reside at `indices` of
Expand Down Expand Up @@ -779,8 +690,8 @@ fn indices_referred_by_expr(
expr: &Expr,
) -> Result<Vec<usize>> {
let mut cols = expr.to_columns()?;
// Get outer-referenced columns:
cols.extend(outer_columns(expr));
// Get outer-referenced (subquery) columns:
outer_columns(expr, &mut cols);
Ok(cols
.iter()
.flat_map(|col| input_schema.index_of_column(col))
Expand Down

0 comments on commit dce26fe

Please sign in to comment.