Skip to content

Commit 738816d

Browse files
committed
Support inferring new predicates to push down
1 parent 2d1062f commit 738816d

File tree

3 files changed

+93
-3
lines changed

3 files changed

+93
-3
lines changed

datafusion/expr/src/expr_rewriter/mod.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,25 @@ pub fn replace_col(expr: Expr, replace_map: &HashMap<&Column, &Column>) -> Resul
148148
.data()
149149
}
150150

151+
pub fn replace_col_with_expr(
152+
expr: Expr,
153+
replace_map: &HashMap<Column, &Expr>,
154+
) -> Result<Expr> {
155+
expr.transform(|expr| {
156+
Ok({
157+
if let Expr::Column(c) = &expr {
158+
match replace_map.get(c) {
159+
Some(new_expr) => Transformed::yes((**new_expr).to_owned()),
160+
None => Transformed::no(expr),
161+
}
162+
} else {
163+
Transformed::no(expr)
164+
}
165+
})
166+
})
167+
.data()
168+
}
169+
151170
/// Recursively 'unnormalize' (remove all qualifiers) from an
152171
/// expression tree.
153172
///

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use datafusion_common::{
3030
internal_err, plan_err, qualified_name, Column, DFSchema, Result,
3131
};
3232
use datafusion_expr::expr::WindowFunction;
33-
use datafusion_expr::expr_rewriter::replace_col;
33+
use datafusion_expr::expr_rewriter::{replace_col, replace_col_with_expr};
3434
use datafusion_expr::logical_plan::{Join, JoinType, LogicalPlan, TableScan, Union};
3535
use datafusion_expr::utils::{
3636
conjunction, expr_to_columns, split_conjunction, split_conjunction_owned,
@@ -784,14 +784,16 @@ impl OptimizerRule for PushDownFilter {
784784

785785
// remove duplicated filters
786786
let child_predicates = split_conjunction_owned(child_filter.predicate);
787-
let new_predicates = parents_predicates
787+
let mut new_predicates = parents_predicates
788788
.into_iter()
789789
.chain(child_predicates)
790790
// use IndexSet to remove dupes while preserving predicate order
791791
.collect::<IndexSet<_>>()
792792
.into_iter()
793793
.collect::<Vec<_>>();
794794

795+
new_predicates = infer_predicates_from_equalities(new_predicates)?;
796+
795797
let Some(new_predicate) = conjunction(new_predicates) else {
796798
return plan_err!("at least one expression exists");
797799
};
@@ -1382,6 +1384,73 @@ fn contain(e: &Expr, check_map: &HashMap<String, Expr>) -> bool {
13821384
is_contain
13831385
}
13841386

1387+
/// Infers new predicates by substituting equalities.
1388+
/// For example, with predicates `t2.b = 3` and `t1.b > t2.b`,
1389+
/// we can infer `t1.b > 3`.
1390+
fn infer_predicates_from_equalities(predicates: Vec<Expr>) -> Result<Vec<Expr>> {
1391+
// Map from column names to their literal values (from equality predicates)
1392+
let mut equality_map: HashMap<Column, Expr> =
1393+
HashMap::with_capacity(predicates.len());
1394+
let mut final_predicates = Vec::with_capacity(predicates.len());
1395+
// First pass: collect column=literal equalities
1396+
for predicate in predicates.iter() {
1397+
if let Expr::BinaryExpr(BinaryExpr {
1398+
left,
1399+
op: Operator::Eq,
1400+
right,
1401+
}) = predicate
1402+
{
1403+
if let Expr::Column(col) = left.as_ref() {
1404+
// Only add to map if right side is a literal
1405+
if matches!(right.as_ref(), Expr::Literal(_)) {
1406+
equality_map.insert(col.clone(), *right.clone());
1407+
final_predicates.push(predicate.clone());
1408+
}
1409+
} else if let Expr::Column(col) = right.as_ref() {
1410+
// Only add to map if left side is a literal
1411+
if matches!(left.as_ref(), Expr::Literal(_)) {
1412+
equality_map.insert(col.clone(), *right.clone());
1413+
final_predicates.push(predicate.clone());
1414+
}
1415+
}
1416+
}
1417+
}
1418+
1419+
// If no equality mappings found, nothing to infer
1420+
if equality_map.is_empty() {
1421+
return Ok(predicates);
1422+
}
1423+
1424+
// Second pass: apply substitutions to create new predicates
1425+
for predicate in predicates {
1426+
// Skip equality predicates we already used for mapping
1427+
if final_predicates.contains(&predicate) {
1428+
continue;
1429+
}
1430+
1431+
// Try to replace columns with their literal values
1432+
let mut columns_in_expr = HashSet::new();
1433+
expr_to_columns(&predicate, &mut columns_in_expr)?;
1434+
1435+
// Create a combined replacement map for all columns in this predicate
1436+
let replace_map: HashMap<_, _> = columns_in_expr
1437+
.into_iter()
1438+
.filter_map(|col| equality_map.get(&col).map(|lit| (col, lit)))
1439+
.collect();
1440+
1441+
if replace_map.is_empty() {
1442+
final_predicates.push(predicate);
1443+
continue;
1444+
}
1445+
// Apply all substitutions at once to get the fully substituted predicate
1446+
let new_pred = replace_col_with_expr(predicate, &replace_map)?;
1447+
1448+
final_predicates.push(new_pred);
1449+
}
1450+
1451+
Ok(final_predicates)
1452+
}
1453+
13851454
#[cfg(test)]
13861455
mod tests {
13871456
use std::any::Any;

datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,9 @@ fn plan_with_order_breaking_variants(
197197
// Replace `SortPreservingMergeExec` with a `CoalescePartitionsExec`
198198
// SPM may have `fetch`, so pass it to the `CoalescePartitionsExec`
199199
let child = Arc::clone(&sort_input.children[0].plan);
200-
let coalesce = CoalescePartitionsExec::new(child).with_fetch(plan.fetch()).unwrap();
200+
let coalesce = CoalescePartitionsExec::new(child)
201+
.with_fetch(plan.fetch())
202+
.unwrap();
201203
sort_input.plan = coalesce;
202204
} else {
203205
return sort_input.update_plan_from_children();

0 commit comments

Comments
 (0)