Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push down filter plan for non-unnest column #11019

Merged
merged 7 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,15 @@ pub enum LogicalPlan {
RecursiveQuery(RecursiveQuery),
}

impl Default for LogicalPlan {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

fn default() -> Self {
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::empty()),
})
}
}

impl LogicalPlan {
/// Get a reference to the logical plan's schema
pub fn schema(&self) -> &DFSchemaRef {
Expand Down
84 changes: 51 additions & 33 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,8 +693,9 @@ impl OptimizerRule for PushDownFilter {
insert_below(LogicalPlan::SubqueryAlias(subquery_alias), new_filter)
}
LogicalPlan::Projection(projection) => {
let predicates = split_conjunction_owned(filter.predicate.clone());
let (new_projection, keep_predicate) =
rewrite_projection(filter.predicate.clone(), projection)?;
rewrite_projection(predicates, projection)?;
if new_projection.transformed {
match keep_predicate {
None => Ok(new_projection),
Expand All @@ -709,41 +710,54 @@ impl OptimizerRule for PushDownFilter {
}
}
LogicalPlan::Unnest(mut unnest) => {
// collect all the Expr::Column in predicate recursively
let mut accum: HashSet<Column> = HashSet::new();
expr_to_columns(&filter.predicate, &mut accum)?;
let predicates = split_conjunction_owned(filter.predicate.clone());
let mut non_unnest_predicates = vec![];
let mut unnest_predicates = vec![];
for predicate in predicates {
// collect all the Expr::Column in predicate recursively
let mut accum: HashSet<Column> = HashSet::new();
expr_to_columns(&predicate, &mut accum)?;

if unnest.exec_columns.iter().any(|c| accum.contains(c)) {
unnest_predicates.push(predicate);
} else {
non_unnest_predicates.push(predicate);
}
}

if unnest.exec_columns.iter().any(|c| accum.contains(c)) {
// Unnest predicates should not be pushed down.
// If no non-unnest predicates exist, early return
if non_unnest_predicates.is_empty() {
filter.input = Arc::new(LogicalPlan::Unnest(unnest));
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
}

// Unnest is built above Projection, so we only take Projection into consideration
match unwrap_arc(unnest.input) {
LogicalPlan::Projection(projection) => {
let (new_projection, keep_predicate) =
rewrite_projection(filter.predicate.clone(), projection)?;
unnest.input = Arc::new(new_projection.data);

if new_projection.transformed {
match keep_predicate {
None => Ok(Transformed::yes(LogicalPlan::Unnest(unnest))),
Some(keep_predicate) => Ok(Transformed::yes(
LogicalPlan::Filter(Filter::try_new(
keep_predicate,
Arc::new(LogicalPlan::Unnest(unnest)),
)?),
)),
}
} else {
filter.input = Arc::new(LogicalPlan::Unnest(unnest));
Ok(Transformed::no(LogicalPlan::Filter(filter)))
}
}
child => {
filter.input = Arc::new(child);
Ok(Transformed::no(LogicalPlan::Filter(filter)))
}
// Push down non-unnest filter predicate
// Unnest
// Unenst Input (Projection)
// -> rewritten to
// Unnest
// Filter
// Unenst Input (Projection)

let unnest_input = std::mem::take(&mut unnest.input);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about directly cloning the input since it is an Arc? This way, we don't need to construct a Default LogicalPlan either.

Copy link
Contributor Author

@jayzhan211 jayzhan211 Jun 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a plan to convert Arc<LogicalPlan> to Box<LogicalPlan>, but before that we need to make sure there is no clone exist, so I think we should try our best to avoid clone. It is also the reason why we have rewrite in Logical optimizer rule that takes LogicalPlan instead of Arc<LogicalPlan>

Ref: #9637 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there's a purpose for doing this, make sense to me. Thanks @jayzhan211


let filter_with_unnest_input = LogicalPlan::Filter(Filter::try_new(
conjunction(non_unnest_predicates).unwrap(), // Safe to unwrap since non_unnest_predicates is not empty.
unnest_input,
)?);

// try push down recursively
let new_plan = self.rewrite(filter_with_unnest_input, _config)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the optimizer already handles recursion, so it might not be necessary to call rewrite here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree!


let unnest_plan =
insert_below(LogicalPlan::Unnest(unnest), new_plan.data)?;

match conjunction(unnest_predicates) {
None => Ok(unnest_plan),
Some(predicate) => Ok(Transformed::yes(LogicalPlan::Filter(
Filter::try_new(predicate, Arc::new(unnest_plan.data))?,
))),
}
}
LogicalPlan::Union(ref union) => {
Expand Down Expand Up @@ -958,6 +972,10 @@ impl OptimizerRule for PushDownFilter {
/// `plan` is a LogicalPlan for `projection` with possibly a new FilterExec below it.
/// `remaining_predicate` is any part of the predicate that could not be pushed down
///
/// # Args
/// - predicates: Split predicates like `[foo=5, bar=6]`
/// - projection: The target projection plan to push down the predicates
///
/// # Example
///
/// Pushing a predicate like `foo=5 AND bar=6` with an input plan like this:
Expand All @@ -974,7 +992,7 @@ impl OptimizerRule for PushDownFilter {
/// ...
/// ```
fn rewrite_projection(
predicate: Expr,
predicates: Vec<Expr>,
projection: Projection,
) -> Result<(Transformed<LogicalPlan>, Option<Expr>)> {
// A projection is filter-commutable if it do not contain volatile predicates or contain volatile
Expand All @@ -994,7 +1012,7 @@ fn rewrite_projection(

let mut push_predicates = vec![];
let mut keep_predicates = vec![];
for expr in split_conjunction_owned(predicate) {
for expr in predicates {
if contain(&expr, &volatile_map) {
keep_predicates.push(expr);
} else {
Expand Down
23 changes: 20 additions & 3 deletions datafusion/sqllogictest/test_files/push_down_filter.slt
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,34 @@ select uc2, column1 from (select unnest(column2) as uc2, column1 from v) where
5 2

# Could push the filter (column1 = 2) down below unnest
# https://github.com/apache/datafusion/issues/11016
query TT
explain select uc2, column1 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3 AND column1 = 2;
----
logical_plan
01)Projection: unnest(v.column2) AS uc2, v.column1
02)--Filter: unnest(v.column2) > Int64(3) AND v.column1 = Int64(2)
02)--Filter: unnest(v.column2) > Int64(3)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please also update the comment a few lines above to reflect the issue is fixed?

# Could push the filter (column1 = 2) down below unnest
# https://github.com/apache/datafusion/issues/11016

03)----Unnest: lists[unnest(v.column2)] structs[]
04)------Projection: v.column2 AS unnest(v.column2), v.column1
05)--------TableScan: v projection=[column1, column2]
05)--------Filter: v.column1 = Int64(2)
06)----------TableScan: v projection=[column1, column2]

query II
select uc2, column1 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3 OR column1 = 2;
----
3 2
4 2
5 2

# only non-unnest filter in AND clause could be pushed down
query TT
explain select uc2, column1 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3 OR column1 = 2;
----
logical_plan
01)Projection: unnest(v.column2) AS uc2, v.column1
02)--Filter: unnest(v.column2) > Int64(3) OR v.column1 = Int64(2)
03)----Unnest: lists[unnest(v.column2)] structs[]
04)------Projection: v.column2 AS unnest(v.column2), v.column1
05)--------TableScan: v projection=[column1, column2]

statement ok
drop table v;
Expand Down