-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Support inferring new predicates to push down #15906
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -631,7 +631,10 @@ impl InferredPredicates { | |
Ok(true) | ||
) | ||
{ | ||
self.predicates.push(replace_col(predicate, replace_map)?); | ||
self.predicates | ||
.push(replace_col(predicate, replace_map, |col| { | ||
Expr::Column((*col).clone()) | ||
})?); | ||
} | ||
|
||
Ok(()) | ||
|
@@ -784,13 +787,14 @@ impl OptimizerRule for PushDownFilter { | |
|
||
// remove duplicated filters | ||
let child_predicates = split_conjunction_owned(child_filter.predicate); | ||
let new_predicates = parents_predicates | ||
let mut new_predicates = parents_predicates | ||
.into_iter() | ||
.chain(child_predicates) | ||
// use IndexSet to remove dupes while preserving predicate order | ||
.collect::<IndexSet<_>>() | ||
.into_iter() | ||
.collect::<Vec<_>>(); | ||
new_predicates = infer_predicates_from_equalities(new_predicates)?; | ||
|
||
let Some(new_predicate) = conjunction(new_predicates) else { | ||
return plan_err!("at least one expression exists"); | ||
|
@@ -1382,6 +1386,73 @@ fn contain(e: &Expr, check_map: &HashMap<String, Expr>) -> bool { | |
is_contain | ||
} | ||
|
||
/// Infers new predicates by substituting equalities. | ||
/// For example, with predicates `t2.b = 3` and `t1.b > t2.b`, | ||
/// we can infer `t1.b > 3`. | ||
fn infer_predicates_from_equalities(predicates: Vec<Expr>) -> Result<Vec<Expr>> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the future, we can move the code into a dedicated optimizer rule, such as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this might be a special case of the range analysis code in https://docs.rs/datafusion/latest/datafusion/physical_expr/intervals/cp_solver/index.html In other words, instead of this special case maybe we could use the cp_solver to create a more general framework for introducing inferred predicates 🤔 Now that we have predicate pushdown for ExecutionPlans maybe it is more realistic to do this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll check the cp_solver (didn't notice the part of code before) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great suggestion. I can help if you need some directions or have any confusion |
||
// Map from column names to their literal values (from equality predicates) | ||
let mut equality_map: HashMap<Column, Expr> = | ||
HashMap::with_capacity(predicates.len()); | ||
let mut final_predicates = Vec::with_capacity(predicates.len()); | ||
// First pass: collect column=literal equalities | ||
for predicate in predicates.iter() { | ||
if let Expr::BinaryExpr(BinaryExpr { | ||
left, | ||
op: Operator::Eq, | ||
right, | ||
}) = predicate | ||
{ | ||
if let Expr::Column(col) = left.as_ref() { | ||
// Only add to map if right side is a literal | ||
if matches!(right.as_ref(), Expr::Literal(_)) { | ||
equality_map.insert(col.clone(), *right.clone()); | ||
final_predicates.push(predicate.clone()); | ||
} | ||
} else if let Expr::Column(col) = right.as_ref() { | ||
// Only add to map if left side is a literal | ||
if matches!(left.as_ref(), Expr::Literal(_)) { | ||
equality_map.insert(col.clone(), *right.clone()); | ||
final_predicates.push(predicate.clone()); | ||
} | ||
} | ||
} | ||
} | ||
|
||
// If no equality mappings found, nothing to infer | ||
if equality_map.is_empty() { | ||
return Ok(predicates); | ||
} | ||
|
||
// Second pass: apply substitutions to create new predicates | ||
for predicate in predicates { | ||
// Skip equality predicates we already used for mapping | ||
if final_predicates.contains(&predicate) { | ||
continue; | ||
} | ||
|
||
// Try to replace columns with their literal values | ||
let mut columns_in_expr = HashSet::new(); | ||
expr_to_columns(&predicate, &mut columns_in_expr)?; | ||
|
||
// Create a combined replacement map for all columns in this predicate | ||
let replace_map: HashMap<_, _> = columns_in_expr | ||
.iter() | ||
.filter_map(|col| equality_map.get(col).map(|lit| (col, lit))) | ||
.collect(); | ||
|
||
if replace_map.is_empty() { | ||
final_predicates.push(predicate); | ||
continue; | ||
} | ||
// Apply all substitutions at once to get the fully substituted predicate | ||
let new_pred = replace_col(predicate, &replace_map, |e| (*e).clone())?; | ||
|
||
final_predicates.push(new_pred); | ||
} | ||
|
||
Ok(final_predicates) | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use std::any::Any; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -259,3 +259,35 @@ logical_plan TableScan: t projection=[a], full_filters=[CAST(t.a AS Utf8) = Utf8 | |
|
||
statement ok | ||
drop table t; | ||
|
||
statement ok | ||
create table t1(a int, b int) as values(1, 2), (2, 3), (3 ,4); | ||
|
||
statement ok | ||
create table t2(a int, b int) as values (1, 2), (2, 4), (4, 5); | ||
|
||
query TT | ||
explain select | ||
* | ||
from | ||
t1 | ||
join t2 on t1.a = t2.a | ||
and t1.b between t2.b | ||
and t2.b + 2 | ||
where | ||
t2.b = 3 | ||
---- | ||
logical_plan | ||
01)Inner Join: t1.a = t2.a | ||
02)--Projection: t1.a, t1.b | ||
03)----Filter: __common_expr_4 >= Int64(3) AND __common_expr_4 <= Int64(5) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The inferred predicate, which can be pushed down to the t1 scan. |
||
04)------Projection: CAST(t1.b AS Int64) AS __common_expr_4, t1.a, t1.b | ||
05)--------TableScan: t1 projection=[a, b] | ||
06)--Filter: t2.b = Int32(3) | ||
07)----TableScan: t2 projection=[a, b] | ||
|
||
statement ok | ||
drop table t1; | ||
|
||
statement ok | ||
drop table t2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't wanna write a similar method for the PR, so made the method generic