Skip to content

Commit cce16e3

Browse files
committed
incorporate suggested comment, use new conjunction function
1 parent 7f0c894 commit cce16e3

File tree

1 file changed

+15
-10
lines changed

1 file changed

+15
-10
lines changed

datafusion/datasource-parquet/src/opener.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
3636
use arrow::datatypes::{Field, Schema, SchemaRef};
3737
use arrow::error::ArrowError;
3838
use datafusion_common::{exec_err, Result};
39-
use datafusion_physical_expr::expressions::{lit, BinaryExpr, Column};
39+
use datafusion_physical_expr::expressions::{lit, Column};
40+
use datafusion_physical_expr::utils::conjunction;
4041
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
4142
use datafusion_physical_optimizer::pruning::PruningPredicate;
4243
use datafusion_physical_plan::dynamic_filters::DynamicFilterSource;
@@ -118,10 +119,18 @@ impl FileOpener for ParquetOpener {
118119
.into_iter()
119120
.flatten()
120121
.collect::<Vec<_>>();
121-
// Collect dynamic_filters into a single predicate by reducing with AND
122-
let dynamic_predicate = dynamic_filters.into_iter().reduce(|a, b| {
123-
Arc::new(BinaryExpr::new(a, datafusion_expr::Operator::And, b))
124-
});
122+
// Using the AND operator to combine the dynamic filters with the static predicate means that
123+
// a row (or a row group) must satisfy both conditions before it's read from disk.
124+
// The approach assumes that the static predicate and dynamic filters are independent and complementary.
125+
// In other words, the dynamic filters are not meant to replace or override the original predicate; they refine the set of rows even further.
126+
// If they were combined using OR, you might end up with more rows than necessary, which would negate the benefits of dynamic filtering.
127+
// Since the dynamic filters are calculated at runtime, they might sometimes be conservative estimates.
128+
// By combining them with AND, the system errs on the side of safety—only excluding data when it’s reasonably certain that the rows won’t match the overall query conditions.
129+
let dynamic_predicate = if dynamic_filters.is_empty() {
130+
None
131+
} else {
132+
Some(conjunction(dynamic_filters))
133+
};
125134
let enable_page_index = should_enable_page_index(
126135
self.enable_page_index,
127136
&self.page_pruning_predicate,
@@ -131,11 +140,7 @@ impl FileOpener for ParquetOpener {
131140
let predicate = match (predicate, dynamic_predicate) {
132141
(Some(p), None) => Some(p),
133142
(None, Some(d)) => Some(d),
134-
(Some(p), Some(d)) => Some(Arc::new(BinaryExpr::new(
135-
p,
136-
datafusion_expr::Operator::And,
137-
d,
138-
)) as Arc<dyn PhysicalExpr>),
143+
(Some(p), Some(d)) => Some(conjunction([p, d])),
139144
(None, None) => None,
140145
};
141146

0 commit comments

Comments
 (0)