Skip to content

Commit 919ee98

Browse files
committed
rewrite predicates before pushing to union inputs
1 parent 09c67d5 commit 919ee98

File tree

1 file changed

+49
-5
lines changed

1 file changed

+49
-5
lines changed

datafusion/src/optimizer/filter_push_down.rs

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
1717
use crate::datasource::datasource::TableProviderFilterPushDown;
1818
use crate::execution::context::ExecutionProps;
19-
use crate::logical_plan::plan::{Aggregate, Filter, Join, Projection};
19+
use crate::logical_plan::plan::{Aggregate, Filter, Join, Projection, Union};
2020
use crate::logical_plan::{
21-
and, replace_col, Column, CrossJoin, JoinType, Limit, LogicalPlan, TableScan,
21+
and, col, replace_col, Column, CrossJoin, JoinType, Limit, LogicalPlan, TableScan,
2222
};
2323
use crate::logical_plan::{DFSchema, Expr};
2424
use crate::optimizer::optimizer::OptimizerRule;
@@ -394,8 +394,29 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
394394
// sort is filter-commutable
395395
push_down(&state, plan)
396396
}
397-
LogicalPlan::Union(_) => {
398-
// union all is filter-commutable
397+
LogicalPlan::Union(Union {
398+
inputs: _,
399+
schema,
400+
alias: _,
401+
}) => {
402+
// union changing all qualifiers while building logical plan so we need
403+
// to rewrite filters to push unqualified columns to inputs
404+
let projection = schema
405+
.fields()
406+
.iter()
407+
.map(|field| (field.qualified_name(), col(field.name())))
408+
.collect::<HashMap<_, _>>();
409+
410+
// rewriting predicate expressions using unqualified names as replacements
411+
if !projection.is_empty() {
412+
for (predicate, columns) in state.filters.iter_mut() {
413+
*predicate = rewrite(predicate, &projection)?;
414+
415+
columns.clear();
416+
utils::expr_to_columns(predicate, columns)?;
417+
}
418+
}
419+
399420
push_down(&state, plan)
400421
}
401422
LogicalPlan::Limit(Limit { input, .. }) => {
@@ -574,7 +595,9 @@ fn rewrite(expr: &Expr, projection: &HashMap<String, Expr>) -> Result<Expr> {
574595
mod tests {
575596
use super::*;
576597
use crate::datasource::TableProvider;
577-
use crate::logical_plan::{lit, sum, DFSchema, Expr, LogicalPlanBuilder, Operator};
598+
use crate::logical_plan::{
599+
lit, sum, union_with_alias, DFSchema, Expr, LogicalPlanBuilder, Operator,
600+
};
578601
use crate::physical_plan::ExecutionPlan;
579602
use crate::test::*;
580603
use crate::{logical_plan::col, prelude::JoinType};
@@ -901,6 +924,27 @@ mod tests {
901924
Ok(())
902925
}
903926

927+
#[test]
928+
fn union_all_with_alias() -> Result<()> {
929+
let table_scan = test_table_scan()?;
930+
let union =
931+
union_with_alias(table_scan.clone(), table_scan, Some("t".to_string()))?;
932+
933+
let plan = LogicalPlanBuilder::from(union)
934+
.filter(col("t.a").eq(lit(1i64)))?
935+
.build()?;
936+
937+
// filter appears below Union without relation qualifier
938+
let expected = "\
939+
Union\
940+
\n Filter: #a = Int64(1)\
941+
\n TableScan: test projection=None\
942+
\n Filter: #a = Int64(1)\
943+
\n TableScan: test projection=None";
944+
assert_optimized_plan_eq(&plan, expected);
945+
Ok(())
946+
}
947+
904948
/// verifies that filters with the same columns are correctly placed
905949
#[test]
906950
fn filter_2_breaks_limits() -> Result<()> {

0 commit comments

Comments
 (0)