Skip to content

Commit 4bc66c8

Browse files
adriangbalamb
andauthored
Refactor filter pushdown APIs to enable joins to pass through filters (#16732)
* refactor filter pushdown to remove assumptions blocking hashjoinexec pushdown * fix assertion * fix * fix any/all * remove hashjoinexec impl * lint * resolve merge * fix imports * fix merge * fix typo * lint * Improve some documentation in filter pushdown (#32) * refactor to a struct with a field instead of enum * better docs, rename methods * fix cargo doc --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent eb4f852 commit 4bc66c8

File tree

11 files changed

+346
-230
lines changed

11 files changed

+346
-230
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,13 @@ use datafusion_datasource::{
2929
use datafusion_physical_expr::conjunction;
3030
use datafusion_physical_expr_common::physical_expr::fmt_sql;
3131
use datafusion_physical_optimizer::PhysicalOptimizerRule;
32-
use datafusion_physical_plan::filter_pushdown::FilterPushdownPhase;
32+
use datafusion_physical_plan::filter_pushdown::{FilterPushdownPhase, PushedDown};
3333
use datafusion_physical_plan::{
3434
displayable,
3535
filter::FilterExec,
3636
filter_pushdown::{
3737
ChildFilterDescription, ChildPushdownResult, FilterDescription,
38-
FilterPushdownPropagation, PredicateSupport,
38+
FilterPushdownPropagation,
3939
},
4040
metrics::ExecutionPlanMetricsSet,
4141
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
@@ -227,19 +227,13 @@ impl FileSource for TestSource {
227227
predicate: Some(conjunction(filters.clone())),
228228
..self.clone()
229229
});
230-
Ok(FilterPushdownPropagation {
231-
filters: filters
232-
.into_iter()
233-
.map(PredicateSupport::Supported)
234-
.collect(),
235-
updated_node: Some(new_node),
236-
})
230+
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
231+
vec![PushedDown::Yes; filters.len()],
232+
)
233+
.with_updated_node(new_node))
237234
} else {
238-
Ok(FilterPushdownPropagation::with_filters(
239-
filters
240-
.into_iter()
241-
.map(PredicateSupport::Unsupported)
242-
.collect(),
235+
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
236+
vec![PushedDown::No; filters.len()],
243237
))
244238
}
245239
}
@@ -547,26 +541,29 @@ impl ExecutionPlan for TestNode {
547541
assert_eq!(self_pushdown_result.len(), 1);
548542
let self_pushdown_result: Vec<_> = self_pushdown_result.into_iter().collect();
549543

550-
match &self_pushdown_result[0] {
551-
PredicateSupport::Unsupported(filter) => {
544+
let first_pushdown_result = self_pushdown_result[0].clone();
545+
546+
match &first_pushdown_result.discriminant {
547+
PushedDown::No => {
552548
// We have a filter to push down
553-
let new_child =
554-
FilterExec::try_new(Arc::clone(filter), Arc::clone(&self.input))?;
549+
let new_child = FilterExec::try_new(
550+
Arc::clone(&first_pushdown_result.predicate),
551+
Arc::clone(&self.input),
552+
)?;
555553
let new_self =
556554
TestNode::new(false, Arc::new(new_child), self.predicate.clone());
557555
let mut res =
558-
FilterPushdownPropagation::transparent(child_pushdown_result);
556+
FilterPushdownPropagation::if_all(child_pushdown_result);
559557
res.updated_node = Some(Arc::new(new_self) as Arc<dyn ExecutionPlan>);
560558
Ok(res)
561559
}
562-
PredicateSupport::Supported(_) => {
563-
let res =
564-
FilterPushdownPropagation::transparent(child_pushdown_result);
560+
PushedDown::Yes => {
561+
let res = FilterPushdownPropagation::if_all(child_pushdown_result);
565562
Ok(res)
566563
}
567564
}
568565
} else {
569-
let res = FilterPushdownPropagation::transparent(child_pushdown_result);
566+
let res = FilterPushdownPropagation::if_all(child_pushdown_result);
570567
Ok(res)
571568
}
572569
}

datafusion/datasource-parquet/src/source.rs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,9 @@ use datafusion_datasource::file_scan_config::FileScanConfig;
4141
use datafusion_physical_expr::conjunction;
4242
use datafusion_physical_expr_common::physical_expr::fmt_sql;
4343
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
44+
use datafusion_physical_plan::filter_pushdown::PushedDown;
4445
use datafusion_physical_plan::filter_pushdown::{
45-
FilterPushdownPropagation, PredicateSupport,
46+
FilterPushdownPropagation, PushedDownPredicate,
4647
};
4748
use datafusion_physical_plan::metrics::Count;
4849
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
@@ -622,11 +623,8 @@ impl FileSource for ParquetSource {
622623
config: &ConfigOptions,
623624
) -> datafusion_common::Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
624625
let Some(file_schema) = self.file_schema.clone() else {
625-
return Ok(FilterPushdownPropagation::with_filters(
626-
filters
627-
.into_iter()
628-
.map(PredicateSupport::Unsupported)
629-
.collect(),
626+
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
627+
vec![PushedDown::No; filters.len()],
630628
));
631629
};
632630
// Determine if based on configs we should push filters down.
@@ -641,29 +639,31 @@ impl FileSource for ParquetSource {
641639
let pushdown_filters = table_pushdown_enabled || config_pushdown_enabled;
642640

643641
let mut source = self.clone();
644-
let filters: Vec<PredicateSupport> = filters
642+
let filters: Vec<PushedDownPredicate> = filters
645643
.into_iter()
646644
.map(|filter| {
647645
if can_expr_be_pushed_down_with_schemas(&filter, &file_schema) {
648-
PredicateSupport::Supported(filter)
646+
PushedDownPredicate::supported(filter)
649647
} else {
650-
PredicateSupport::Unsupported(filter)
648+
PushedDownPredicate::unsupported(filter)
651649
}
652650
})
653651
.collect();
654652
if filters
655653
.iter()
656-
.all(|f| matches!(f, PredicateSupport::Unsupported(_)))
654+
.all(|f| matches!(f.discriminant, PushedDown::No))
657655
{
658656
// No filters can be pushed down, so we can just return the remaining filters
659657
// and avoid replacing the source in the physical plan.
660-
return Ok(FilterPushdownPropagation::with_filters(filters));
658+
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
659+
vec![PushedDown::No; filters.len()],
660+
));
661661
}
662662
let allowed_filters = filters
663663
.iter()
664-
.filter_map(|f| match f {
665-
PredicateSupport::Supported(expr) => Some(Arc::clone(expr)),
666-
PredicateSupport::Unsupported(_) => None,
664+
.filter_map(|f| match f.discriminant {
665+
PushedDown::Yes => Some(Arc::clone(&f.predicate)),
666+
PushedDown::No => None,
667667
})
668668
.collect_vec();
669669
let predicate = match source.predicate {
@@ -678,15 +678,15 @@ impl FileSource for ParquetSource {
678678
// If pushdown_filters is false we tell our parents that they still have to handle the filters,
679679
// even if we updated the predicate to include the filters (they will only be used for stats pruning).
680680
if !pushdown_filters {
681-
return Ok(FilterPushdownPropagation::with_filters(
682-
filters
683-
.into_iter()
684-
.map(|f| PredicateSupport::Unsupported(f.into_inner()))
685-
.collect_vec(),
681+
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
682+
vec![PushedDown::No; filters.len()],
686683
)
687684
.with_updated_node(source));
688685
}
689-
Ok(FilterPushdownPropagation::with_filters(filters).with_updated_node(source))
686+
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
687+
filters.iter().map(|f| f.discriminant).collect(),
688+
)
689+
.with_updated_node(source))
690690
}
691691

692692
fn with_schema_adapter_factory(

datafusion/datasource/src/file.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,7 @@ use arrow::datatypes::SchemaRef;
3030
use datafusion_common::config::ConfigOptions;
3131
use datafusion_common::{not_impl_err, Result, Statistics};
3232
use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
33-
use datafusion_physical_plan::filter_pushdown::{
34-
FilterPushdownPropagation, PredicateSupport,
35-
};
33+
use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown};
3634
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
3735
use datafusion_physical_plan::DisplayFormatType;
3836

@@ -122,11 +120,8 @@ pub trait FileSource: Send + Sync {
122120
filters: Vec<Arc<dyn PhysicalExpr>>,
123121
_config: &ConfigOptions,
124122
) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
125-
Ok(FilterPushdownPropagation::with_filters(
126-
filters
127-
.into_iter()
128-
.map(PredicateSupport::Unsupported)
129-
.collect(),
123+
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
124+
vec![PushedDown::No; filters.len()],
130125
))
131126
}
132127

datafusion/datasource/src/source.rs

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use datafusion_physical_expr::{
4242
use datafusion_physical_expr_common::sort_expr::LexOrdering;
4343
use datafusion_physical_plan::filter::collect_columns_from_predicate;
4444
use datafusion_physical_plan::filter_pushdown::{
45-
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PredicateSupport,
45+
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
4646
};
4747

4848
/// A source of data, typically a list of files or memory
@@ -172,11 +172,8 @@ pub trait DataSource: Send + Sync + Debug {
172172
filters: Vec<Arc<dyn PhysicalExpr>>,
173173
_config: &ConfigOptions,
174174
) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
175-
Ok(FilterPushdownPropagation::with_filters(
176-
filters
177-
.into_iter()
178-
.map(PredicateSupport::Unsupported)
179-
.collect(),
175+
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
176+
vec![PushedDown::No; filters.len()],
180177
))
181178
}
182179
}
@@ -324,17 +321,14 @@ impl ExecutionPlan for DataSourceExec {
324321
config: &ConfigOptions,
325322
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
326323
// Push any remaining filters into our data source
327-
let res = self.data_source.try_pushdown_filters(
328-
child_pushdown_result
329-
.parent_filters
330-
.into_iter()
331-
.map(|f| match f {
332-
PredicateSupport::Supported(expr) => expr,
333-
PredicateSupport::Unsupported(expr) => expr,
334-
})
335-
.collect(),
336-
config,
337-
)?;
324+
let parent_filters = child_pushdown_result
325+
.parent_filters
326+
.into_iter()
327+
.map(|f| f.filter)
328+
.collect_vec();
329+
let res = self
330+
.data_source
331+
.try_pushdown_filters(parent_filters.clone(), config)?;
338332
match res.updated_node {
339333
Some(data_source) => {
340334
let mut new_node = self.clone();
@@ -346,9 +340,10 @@ impl ExecutionPlan for DataSourceExec {
346340
let filter = conjunction(
347341
res.filters
348342
.iter()
349-
.filter_map(|f| match f {
350-
PredicateSupport::Supported(expr) => Some(Arc::clone(expr)),
351-
PredicateSupport::Unsupported(_) => None,
343+
.zip(parent_filters)
344+
.filter_map(|(s, f)| match s {
345+
PushedDown::Yes => Some(f),
346+
PushedDown::No => None,
352347
})
353348
.collect_vec(),
354349
);

0 commit comments

Comments
 (0)