-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Push down filter plan for non-unnest column #11019
Changes from 6 commits
e1f9ccc
1ddc00e
592474d
2eaa520
adbb108
33c3d8b
930bf67
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -693,8 +693,9 @@ impl OptimizerRule for PushDownFilter { | |
insert_below(LogicalPlan::SubqueryAlias(subquery_alias), new_filter) | ||
} | ||
LogicalPlan::Projection(projection) => { | ||
let predicates = split_conjunction_owned(filter.predicate.clone()); | ||
let (new_projection, keep_predicate) = | ||
rewrite_projection(filter.predicate.clone(), projection)?; | ||
rewrite_projection(predicates, projection)?; | ||
if new_projection.transformed { | ||
match keep_predicate { | ||
None => Ok(new_projection), | ||
|
@@ -709,41 +710,54 @@ impl OptimizerRule for PushDownFilter { | |
} | ||
} | ||
LogicalPlan::Unnest(mut unnest) => { | ||
// collect all the Expr::Column in predicate recursively | ||
let mut accum: HashSet<Column> = HashSet::new(); | ||
expr_to_columns(&filter.predicate, &mut accum)?; | ||
let predicates = split_conjunction_owned(filter.predicate.clone()); | ||
let mut non_unnest_predicates = vec![]; | ||
let mut unnest_predicates = vec![]; | ||
for predicate in predicates { | ||
// collect all the Expr::Column in predicate recursively | ||
let mut accum: HashSet<Column> = HashSet::new(); | ||
expr_to_columns(&predicate, &mut accum)?; | ||
|
||
if unnest.exec_columns.iter().any(|c| accum.contains(c)) { | ||
unnest_predicates.push(predicate); | ||
} else { | ||
non_unnest_predicates.push(predicate); | ||
} | ||
} | ||
|
||
if unnest.exec_columns.iter().any(|c| accum.contains(c)) { | ||
// Unnest predicates should not be pushed down. | ||
// If no non-unnest predicates exist, early return | ||
if non_unnest_predicates.is_empty() { | ||
filter.input = Arc::new(LogicalPlan::Unnest(unnest)); | ||
return Ok(Transformed::no(LogicalPlan::Filter(filter))); | ||
} | ||
|
||
// Unnest is built above Projection, so we only take Projection into consideration | ||
match unwrap_arc(unnest.input) { | ||
LogicalPlan::Projection(projection) => { | ||
let (new_projection, keep_predicate) = | ||
rewrite_projection(filter.predicate.clone(), projection)?; | ||
unnest.input = Arc::new(new_projection.data); | ||
|
||
if new_projection.transformed { | ||
match keep_predicate { | ||
None => Ok(Transformed::yes(LogicalPlan::Unnest(unnest))), | ||
Some(keep_predicate) => Ok(Transformed::yes( | ||
LogicalPlan::Filter(Filter::try_new( | ||
keep_predicate, | ||
Arc::new(LogicalPlan::Unnest(unnest)), | ||
)?), | ||
)), | ||
} | ||
} else { | ||
filter.input = Arc::new(LogicalPlan::Unnest(unnest)); | ||
Ok(Transformed::no(LogicalPlan::Filter(filter))) | ||
} | ||
} | ||
child => { | ||
filter.input = Arc::new(child); | ||
Ok(Transformed::no(LogicalPlan::Filter(filter))) | ||
} | ||
// Push down non-unnest filter predicate | ||
// Unnest | ||
// Unenst Input (Projection) | ||
// -> rewritten to | ||
// Unnest | ||
// Filter | ||
// Unenst Input (Projection) | ||
|
||
let unnest_input = std::mem::take(&mut unnest.input); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about directly cloning the input since it is an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have a plan to convert Ref: #9637 (comment) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since there's a purpose for doing this, make sense to me. Thanks @jayzhan211 |
||
|
||
let filter_with_unnest_input = LogicalPlan::Filter(Filter::try_new( | ||
conjunction(non_unnest_predicates).unwrap(), // Safe to unwrap since non_unnest_predicates is not empty. | ||
unnest_input, | ||
)?); | ||
|
||
// try push down recursively | ||
let new_plan = self.rewrite(filter_with_unnest_input, _config)?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the optimizer already handles recursion, so it might not be necessary to call rewrite here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree! |
||
|
||
let unnest_plan = | ||
insert_below(LogicalPlan::Unnest(unnest), new_plan.data)?; | ||
|
||
match conjunction(unnest_predicates) { | ||
None => Ok(unnest_plan), | ||
Some(predicate) => Ok(Transformed::yes(LogicalPlan::Filter( | ||
Filter::try_new(predicate, Arc::new(unnest_plan.data))?, | ||
))), | ||
} | ||
} | ||
LogicalPlan::Union(ref union) => { | ||
|
@@ -958,6 +972,10 @@ impl OptimizerRule for PushDownFilter { | |
/// `plan` is a LogicalPlan for `projection` with possibly a new FilterExec below it. | ||
/// `remaining_predicate` is any part of the predicate that could not be pushed down | ||
/// | ||
/// # Args | ||
/// - predicates: Split predicates like `[foo=5, bar=6]` | ||
/// - projection: The target projection plan to push down the predicates | ||
/// | ||
/// # Example | ||
/// | ||
/// Pushing a predicate like `foo=5 AND bar=6` with an input plan like this: | ||
|
@@ -974,7 +992,7 @@ impl OptimizerRule for PushDownFilter { | |
/// ... | ||
/// ``` | ||
fn rewrite_projection( | ||
predicate: Expr, | ||
predicates: Vec<Expr>, | ||
projection: Projection, | ||
) -> Result<(Transformed<LogicalPlan>, Option<Expr>)> { | ||
// A projection is filter-commutable if it do not contain volatile predicates or contain volatile | ||
|
@@ -994,7 +1012,7 @@ fn rewrite_projection( | |
|
||
let mut push_predicates = vec![]; | ||
let mut keep_predicates = vec![]; | ||
for expr in split_conjunction_owned(predicate) { | ||
for expr in predicates { | ||
if contain(&expr, &volatile_map) { | ||
keep_predicates.push(expr); | ||
} else { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,17 +67,34 @@ select uc2, column1 from (select unnest(column2) as uc2, column1 from v) where | |
5 2 | ||
|
||
# Could push the filter (column1 = 2) down below unnest | ||
# https://github.com/apache/datafusion/issues/11016 | ||
query TT | ||
explain select uc2, column1 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3 AND column1 = 2; | ||
---- | ||
logical_plan | ||
01)Projection: unnest(v.column2) AS uc2, v.column1 | ||
02)--Filter: unnest(v.column2) > Int64(3) AND v.column1 = Int64(2) | ||
02)--Filter: unnest(v.column2) > Int64(3) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please also update the comment a few lines above to reflect the issue is fixed?
|
||
03)----Unnest: lists[unnest(v.column2)] structs[] | ||
04)------Projection: v.column2 AS unnest(v.column2), v.column1 | ||
05)--------TableScan: v projection=[column1, column2] | ||
05)--------Filter: v.column1 = Int64(2) | ||
06)----------TableScan: v projection=[column1, column2] | ||
|
||
query II | ||
select uc2, column1 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3 OR column1 = 2; | ||
---- | ||
3 2 | ||
4 2 | ||
5 2 | ||
|
||
# only non-unnest filter in AND clause could be pushed down | ||
query TT | ||
explain select uc2, column1 from (select unnest(column2) as uc2, column1 from v) where uc2 > 3 OR column1 = 2; | ||
---- | ||
logical_plan | ||
01)Projection: unnest(v.column2) AS uc2, v.column1 | ||
02)--Filter: unnest(v.column2) > Int64(3) OR v.column1 = Int64(2) | ||
03)----Unnest: lists[unnest(v.column2)] structs[] | ||
04)------Projection: v.column2 AS unnest(v.column2), v.column1 | ||
05)--------TableScan: v projection=[column1, column2] | ||
|
||
statement ok | ||
drop table v; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍