Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: Use standard tree walk in Projection Pushdown #8787

Merged
merged 1 commit into from
Jan 9, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 29 additions & 118 deletions datafusion/optimizer/src/optimize_projections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ use datafusion_common::{
use datafusion_expr::expr::{Alias, ScalarFunction, ScalarFunctionDefinition};
use datafusion_expr::{
logical_plan::LogicalPlan, projection_schema, Aggregate, BinaryExpr, Cast, Distinct,
Expr, GroupingSet, Projection, TableScan, Window,
Expr, Projection, TableScan, Window,
};

use datafusion_expr::utils::inspect_expr_pre;
use hashbrown::HashMap;
use itertools::{izip, Itertools};

Expand Down Expand Up @@ -531,7 +532,7 @@ macro_rules! rewrite_expr_with_check {
///
/// - `Ok(Some(Expr))`: Rewrite was successful. Contains the rewritten result.
/// - `Ok(None)`: Signals that `expr` can not be rewritten.
/// - `Err(error)`: An error occured during the function call.
/// - `Err(error)`: An error occurred during the function call.
fn rewrite_expr(expr: &Expr, input: &Projection) -> Result<Option<Expr>> {
let result = match expr {
Expr::Column(col) => {
Expand Down Expand Up @@ -574,129 +575,39 @@ fn rewrite_expr(expr: &Expr, input: &Projection) -> Result<Option<Expr>> {
Ok(Some(result))
}

/// Retrieves a set of outer-referenced columns by the given expression, `expr`.
/// Note that the `Expr::to_columns()` function doesn't return these columns.
///
/// # Parameters
///
/// * `expr` - The expression to analyze for outer-referenced columns.
///
/// # Returns
///
/// returns a `HashSet<Column>` containing all outer-referenced columns.
fn outer_columns(expr: &Expr) -> HashSet<Column> {
let mut columns = HashSet::new();
outer_columns_helper(expr, &mut columns);
columns
}

/// A recursive subroutine that accumulates outer-referenced columns by the
/// Accumulates outer-referenced columns by the
/// given expression, `expr`.
///
/// # Parameters
///
/// * `expr` - The expression to analyze for outer-referenced columns.
/// * `columns` - A mutable reference to a `HashSet<Column>` where detected
/// columns are collected.
fn outer_columns_helper(expr: &Expr, columns: &mut HashSet<Column>) {
match expr {
Expr::OuterReferenceColumn(_, col) => {
columns.insert(col.clone());
}
Expr::BinaryExpr(binary_expr) => {
outer_columns_helper(&binary_expr.left, columns);
outer_columns_helper(&binary_expr.right, columns);
}
Expr::ScalarSubquery(subquery) => {
let exprs = subquery.outer_ref_columns.iter();
outer_columns_helper_multi(exprs, columns);
}
Expr::Exists(exists) => {
let exprs = exists.subquery.outer_ref_columns.iter();
outer_columns_helper_multi(exprs, columns);
}
Expr::Alias(alias) => outer_columns_helper(&alias.expr, columns),
Expr::InSubquery(insubquery) => {
let exprs = insubquery.subquery.outer_ref_columns.iter();
outer_columns_helper_multi(exprs, columns);
}
Expr::Cast(cast) => outer_columns_helper(&cast.expr, columns),
Expr::Sort(sort) => outer_columns_helper(&sort.expr, columns),
Expr::AggregateFunction(aggregate_fn) => {
outer_columns_helper_multi(aggregate_fn.args.iter(), columns);
if let Some(filter) = aggregate_fn.filter.as_ref() {
outer_columns_helper(filter, columns);
fn outer_columns(expr: &Expr, columns: &mut HashSet<Column>) {
// inspect_expr_pre doesn't handle subquery references, so find them explicitly
inspect_expr_pre(expr, |expr| {
match expr {
Expr::OuterReferenceColumn(_, col) => {
columns.insert(col.clone());
}
if let Some(obs) = aggregate_fn.order_by.as_ref() {
outer_columns_helper_multi(obs.iter(), columns);
Expr::ScalarSubquery(subquery) => {
outer_columns_helper_multi(&subquery.outer_ref_columns, columns);
}
}
Expr::WindowFunction(window_fn) => {
outer_columns_helper_multi(window_fn.args.iter(), columns);
outer_columns_helper_multi(window_fn.order_by.iter(), columns);
outer_columns_helper_multi(window_fn.partition_by.iter(), columns);
}
Expr::GroupingSet(groupingset) => match groupingset {
GroupingSet::GroupingSets(multi_exprs) => {
multi_exprs
.iter()
.for_each(|e| outer_columns_helper_multi(e.iter(), columns));
Expr::Exists(exists) => {
outer_columns_helper_multi(&exists.subquery.outer_ref_columns, columns);
}
GroupingSet::Cube(exprs) | GroupingSet::Rollup(exprs) => {
outer_columns_helper_multi(exprs.iter(), columns);
Expr::InSubquery(insubquery) => {
outer_columns_helper_multi(
&insubquery.subquery.outer_ref_columns,
columns,
);
}
},
Expr::ScalarFunction(scalar_fn) => {
outer_columns_helper_multi(scalar_fn.args.iter(), columns);
}
Expr::Like(like) => {
outer_columns_helper(&like.expr, columns);
outer_columns_helper(&like.pattern, columns);
}
Expr::InList(in_list) => {
outer_columns_helper(&in_list.expr, columns);
outer_columns_helper_multi(in_list.list.iter(), columns);
}
Expr::Case(case) => {
let when_then_exprs = case
.when_then_expr
.iter()
.flat_map(|(first, second)| [first.as_ref(), second.as_ref()]);
outer_columns_helper_multi(when_then_exprs, columns);
if let Some(expr) = case.expr.as_ref() {
outer_columns_helper(expr, columns);
}
if let Some(expr) = case.else_expr.as_ref() {
outer_columns_helper(expr, columns);
}
}
Expr::SimilarTo(similar_to) => {
outer_columns_helper(&similar_to.expr, columns);
outer_columns_helper(&similar_to.pattern, columns);
}
Expr::TryCast(try_cast) => outer_columns_helper(&try_cast.expr, columns),
Expr::GetIndexedField(index) => outer_columns_helper(&index.expr, columns),
Expr::Between(between) => {
outer_columns_helper(&between.expr, columns);
outer_columns_helper(&between.low, columns);
outer_columns_helper(&between.high, columns);
}
Expr::Not(expr)
| Expr::IsNotFalse(expr)
| Expr::IsFalse(expr)
| Expr::IsTrue(expr)
| Expr::IsNotTrue(expr)
| Expr::IsUnknown(expr)
| Expr::IsNotUnknown(expr)
| Expr::IsNotNull(expr)
| Expr::IsNull(expr)
| Expr::Negative(expr) => outer_columns_helper(expr, columns),
Expr::Column(_)
| Expr::Literal(_)
| Expr::Wildcard { .. }
| Expr::ScalarVariable { .. }
| Expr::Placeholder(_) => (),
}
_ => {}
};
Ok(()) as Result<()>
})
// unwrap: closure above never returns Err, so can not be Err here
.unwrap();
}

/// A recursive subroutine that accumulates outer-referenced columns by the
Expand All @@ -708,10 +619,10 @@ fn outer_columns_helper(expr: &Expr, columns: &mut HashSet<Column>) {
/// * `columns` - A mutable reference to a `HashSet<Column>` where detected
/// columns are collected.
fn outer_columns_helper_multi<'a>(
exprs: impl Iterator<Item = &'a Expr>,
exprs: impl IntoIterator<Item = &'a Expr>,
columns: &mut HashSet<Column>,
) {
exprs.for_each(|e| outer_columns_helper(e, columns));
exprs.into_iter().for_each(|e| outer_columns(e, columns));
}

/// Generates the required expressions (columns) that reside at `indices` of
Expand Down Expand Up @@ -779,8 +690,8 @@ fn indices_referred_by_expr(
expr: &Expr,
) -> Result<Vec<usize>> {
let mut cols = expr.to_columns()?;
// Get outer-referenced columns:
cols.extend(outer_columns(expr));
// Get outer-referenced (subquery) columns:
outer_columns(expr, &mut cols);
Ok(cols
.iter()
.flat_map(|col| input_schema.index_of_column(col))
Expand Down