Skip to content

Commit 9741d87

Browse files
committed
Return Vec<bool> from PruningPredicateBuilder rather than an Fn
1 parent 2c6a23b commit 9741d87

File tree

2 files changed

+72
-50
lines changed

2 files changed

+72
-50
lines changed

datafusion/src/physical_optimizer/pruning.rs

Lines changed: 43 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,14 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! This module contains code to rule out row groups / partitions /
19-
//! etc based on statistics prior in order to skip evaluating entire
20-
//! swaths of rows.
18+
//! This module contains code to prune "containers" of row groups
19+
//! based on statistics prior to execution. This can lead to
20+
//! significant performance improvements by avoiding the need
21+
//! to evaluate a plan on entire containers (e.g. an entire file)
22+
//!
23+
//! For example, it is used to prune (skip) row groups while reading
24+
//! parquet files if it can be determined from the predicate that
25+
//! nothing in the row group can match.
2126
//!
2227
//! This code is currently specific to Parquet, but soon (TM), via
2328
//! https://github.com/apache/arrow-datafusion/issues/363 it will
@@ -85,24 +90,24 @@ impl PruningPredicateBuilder {
8590
})
8691
}
8792

88-
/// Generate a predicate function used to filter based on
89-
/// statistics
93+
/// For each set of statistics, evalates the predicate in this
94+
/// builder and returns a `bool` with the following meaning for a
95+
/// container with those statistics:
96+
///
97+
/// `true`: The container MAY contain rows that match the predicate
9098
///
91-
/// This function takes a slice of statistics as parameter, so
92-
/// that DataFusion's physical expressions can be executed once
93-
/// against a single RecordBatch, containing statistics arrays, on
94-
/// which the physical predicate expression is executed to
95-
/// generate a row group filter array.
99+
/// `false`: The container definitely does NOT contain rows that match the predicate
96100
///
97-
/// The generated filter function is then used in the returned
98-
/// closure to filter row groups. NOTE this is parquet specific at the moment
101+
/// Note this function takes a slice of statistics as a parameter
102+
/// to amortize the cost of the evaluation of the predicate
103+
/// against a single record batch.
99104
pub fn build_pruning_predicate(
100105
&self,
101-
row_group_metadata: &[RowGroupMetaData],
102-
) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
106+
statistics: &[RowGroupMetaData],
107+
) -> Result<Vec<bool>> {
103108
// build statistics record batch
104-
let predicate_result = build_statistics_record_batch(
105-
row_group_metadata,
109+
let predicate_array = build_statistics_record_batch(
110+
statistics,
106111
&self.schema,
107112
&self.stat_column_req,
108113
)
@@ -112,49 +117,45 @@ impl PruningPredicateBuilder {
112117
})
113118
.and_then(|v| match v {
114119
ColumnarValue::Array(array) => Ok(array),
115-
ColumnarValue::Scalar(_) => Err(DataFusionError::Plan(
120+
ColumnarValue::Scalar(_) => Err(DataFusionError::Internal(
116121
"predicate expression didn't return an array".to_string(),
117122
)),
118-
});
119-
120-
let predicate_array = match predicate_result {
121-
Ok(array) => array,
122-
// row group filter array could not be built
123-
// return a closure which will not filter out any row groups
124-
_ => return Box::new(|_r, _i| true),
125-
};
123+
})?;
126124

127-
let predicate_array = predicate_array.as_any().downcast_ref::<BooleanArray>();
128-
match predicate_array {
129-
// return row group predicate function
130-
Some(array) => {
131-
// when the result of the predicate expression for a row group is null / undefined,
132-
// e.g. due to missing statistics, this row group can't be filtered out,
133-
// so replace with true
134-
let predicate_values =
135-
array.iter().map(|x| x.unwrap_or(true)).collect::<Vec<_>>();
136-
Box::new(move |_, i| predicate_values[i])
137-
}
138-
// predicate result is not a BooleanArray
139-
// return a closure which will not filter out any row groups
140-
_ => Box::new(|_r, _i| true),
141-
}
125+
let predicate_array = predicate_array
126+
.as_any()
127+
.downcast_ref::<BooleanArray>()
128+
.ok_or_else(|| {
129+
DataFusionError::Internal(format!(
130+
"Expected pruning predicate evaluation to be BooleanArray, \
131+
but was {:?}",
132+
predicate_array
133+
))
134+
})?;
135+
136+
// when the result of the predicate expression for a row group is null / undefined,
137+
// e.g. due to missing statistics, this row group can't be filtered out,
138+
// so replace with true
139+
Ok(predicate_array
140+
.into_iter()
141+
.map(|x| x.unwrap_or(true))
142+
.collect::<Vec<_>>())
142143
}
143144
}
144145

145146
/// Build a RecordBatch from a list of statistics (currently parquet
146147
/// [`RowGroupMetadata`] structs), creating arrays, one for each
147148
/// statistics column, as requested in the stat_column_req parameter.
148149
fn build_statistics_record_batch(
149-
row_groups: &[RowGroupMetaData],
150+
statistics: &[RowGroupMetaData],
150151
schema: &Schema,
151152
stat_column_req: &[(String, StatisticsType, Field)],
152153
) -> Result<RecordBatch> {
153154
let mut fields = Vec::<Field>::new();
154155
let mut arrays = Vec::<ArrayRef>::new();
155156
for (column_name, statistics_type, stat_field) in stat_column_req {
156157
if let Some((column_index, _)) = schema.column_with_name(column_name) {
157-
let statistics = row_groups
158+
let statistics = statistics
158159
.iter()
159160
.map(|g| g.column(column_index).statistics())
160161
.collect::<Vec<_>>();

datafusion/src/physical_plan/parquet.rs

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,10 @@ use arrow::{
3838
error::{ArrowError, Result as ArrowResult},
3939
record_batch::RecordBatch,
4040
};
41-
use parquet::file::reader::{FileReader, SerializedFileReader};
41+
use parquet::file::{
42+
metadata::RowGroupMetaData,
43+
reader::{FileReader, SerializedFileReader},
44+
};
4245

4346
use fmt::Debug;
4447
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
@@ -454,6 +457,22 @@ fn send_result(
454457
Ok(())
455458
}
456459

460+
fn build_row_group_predicate(
461+
predicate_builder: &PruningPredicateBuilder,
462+
row_group_metadata: &[RowGroupMetaData],
463+
) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
464+
let predicate_values = predicate_builder.build_pruning_predicate(row_group_metadata);
465+
466+
let predicate_values = match predicate_values {
467+
Ok(values) => values,
468+
// stats filter array could not be built
469+
// return a closure which will not filter out any row groups
470+
_ => return Box::new(|_r, _i| true),
471+
};
472+
473+
Box::new(move |_, i| predicate_values[i])
474+
}
475+
457476
fn read_files(
458477
filenames: &[String],
459478
projection: &[usize],
@@ -467,8 +486,10 @@ fn read_files(
467486
let file = File::open(&filename)?;
468487
let mut file_reader = SerializedFileReader::new(file)?;
469488
if let Some(predicate_builder) = predicate_builder {
470-
let row_group_predicate = predicate_builder
471-
.build_pruning_predicate(file_reader.metadata().row_groups());
489+
let row_group_predicate = build_row_group_predicate(
490+
predicate_builder,
491+
file_reader.metadata().row_groups(),
492+
);
472493
file_reader.filter_row_groups(&row_group_predicate);
473494
}
474495
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
@@ -643,7 +664,7 @@ mod tests {
643664
);
644665
let row_group_metadata = vec![rgm1, rgm2];
645666
let row_group_predicate =
646-
predicate_builder.build_pruning_predicate(&row_group_metadata);
667+
build_row_group_predicate(&predicate_builder, &row_group_metadata);
647668
let row_group_filter = row_group_metadata
648669
.iter()
649670
.enumerate()
@@ -673,7 +694,7 @@ mod tests {
673694
);
674695
let row_group_metadata = vec![rgm1, rgm2];
675696
let row_group_predicate =
676-
predicate_builder.build_pruning_predicate(&row_group_metadata);
697+
build_row_group_predicate(&predicate_builder, &row_group_metadata);
677698
let row_group_filter = row_group_metadata
678699
.iter()
679700
.enumerate()
@@ -718,7 +739,7 @@ mod tests {
718739
);
719740
let row_group_metadata = vec![rgm1, rgm2];
720741
let row_group_predicate =
721-
predicate_builder.build_pruning_predicate(&row_group_metadata);
742+
build_row_group_predicate(&predicate_builder, &row_group_metadata);
722743
let row_group_filter = row_group_metadata
723744
.iter()
724745
.enumerate()
@@ -733,7 +754,7 @@ mod tests {
733754
let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2)));
734755
let predicate_builder = PruningPredicateBuilder::try_new(&expr, schema)?;
735756
let row_group_predicate =
736-
predicate_builder.build_pruning_predicate(&row_group_metadata);
757+
build_row_group_predicate(&predicate_builder, &row_group_metadata);
737758
let row_group_filter = row_group_metadata
738759
.iter()
739760
.enumerate()
@@ -777,7 +798,7 @@ mod tests {
777798
);
778799
let row_group_metadata = vec![rgm1, rgm2];
779800
let row_group_predicate =
780-
predicate_builder.build_pruning_predicate(&row_group_metadata);
801+
build_row_group_predicate(&predicate_builder, &row_group_metadata);
781802
let row_group_filter = row_group_metadata
782803
.iter()
783804
.enumerate()

0 commit comments

Comments
 (0)