Skip to content

Commit 438eac0

Browse files
committed
Add parquet predicate pushdown tests with smaller pages
1 parent d77d71c commit 438eac0

File tree

5 files changed

+193
-46
lines changed

5 files changed

+193
-46
lines changed

benchmarks/src/bin/parquet_filter_pushdown.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use datafusion::logical_expr::{lit, or, Expr};
2121
use datafusion::optimizer::utils::disjunction;
2222
use datafusion::physical_plan::collect;
2323
use datafusion::prelude::{col, SessionConfig, SessionContext};
24+
use parquet::file::properties::WriterProperties;
2425
use parquet_test_utils::{ParquetScanOptions, TestParquetFile};
2526
use std::path::PathBuf;
2627
use std::time::Instant;
@@ -73,7 +74,19 @@ async fn main() -> Result<()> {
7374

7475
let path = opt.path.join("logs.parquet");
7576

76-
let test_file = gen_data(path, opt.scale_factor, opt.page_size, opt.row_group_size)?;
77+
let mut props_builder = WriterProperties::builder();
78+
79+
if let Some(s) = opt.page_size {
80+
props_builder = props_builder
81+
.set_data_pagesize_limit(s)
82+
.set_write_batch_size(s);
83+
}
84+
85+
if let Some(s) = opt.row_group_size {
86+
props_builder = props_builder.set_max_row_group_size(s);
87+
}
88+
89+
let test_file = gen_data(path, opt.scale_factor, props_builder.build())?;
7790

7891
run_benchmarks(&mut ctx, &test_file, opt.iterations, opt.debug).await?;
7992

@@ -179,17 +192,11 @@ async fn exec_scan(
179192
fn gen_data(
180193
path: PathBuf,
181194
scale_factor: f32,
182-
page_size: Option<usize>,
183-
row_group_size: Option<usize>,
195+
props: WriterProperties,
184196
) -> Result<TestParquetFile> {
185197
let generator = AccessLogGenerator::new();
186198

187199
let num_batches = 100_f32 * scale_factor;
188200

189-
TestParquetFile::try_new(
190-
path,
191-
generator.take(num_batches as usize),
192-
page_size,
193-
row_group_size,
194-
)
201+
TestParquetFile::try_new(path, props, generator.take(num_batches as usize))
195202
}

datafusion/core/src/physical_optimizer/pruning.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter};
5151
use datafusion_expr::utils::expr_to_columns;
5252
use datafusion_expr::{binary_expr, cast, try_cast, ExprSchemable};
5353
use datafusion_physical_expr::create_physical_expr;
54+
use log::trace;
5455

5556
/// Interface to pass statistics information to [`PruningPredicate`]
5657
///
@@ -415,6 +416,12 @@ fn build_statistics_record_batch<S: PruningStatistics>(
415416
let mut options = RecordBatchOptions::default();
416417
options.row_count = Some(statistics.num_containers());
417418

419+
trace!(
420+
"Creating statistics batch for {:#?} with {:#?}",
421+
required_columns,
422+
arrays
423+
);
424+
418425
RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| {
419426
DataFusionError::Plan(format!("Can not create statistics record batch: {}", err))
420427
})

datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError};
2222
use datafusion_common::{Column, DataFusionError, Result};
2323
use datafusion_expr::utils::expr_to_columns;
2424
use datafusion_optimizer::utils::split_conjunction;
25-
use log::{debug, error};
25+
use log::{debug, error, trace};
2626
use parquet::{
2727
arrow::arrow_reader::{RowSelection, RowSelector},
2828
errors::ParquetError,
@@ -143,15 +143,19 @@ pub(crate) fn build_page_filter(
143143
}),
144144
);
145145
} else {
146+
trace!(
147+
"Did not have enough metadata to prune with page indexes, falling back, falling back to all rows",
148+
);
146149
// fallback select all rows
147150
let all_selected =
148151
vec![RowSelector::select(groups[*r].num_rows() as usize)];
149152
selectors.push(all_selected);
150153
}
151154
}
152155
debug!(
153-
"Use filter and page index create RowSelection {:?} from predicate:{:?}",
154-
&selectors, predicate
156+
"Use filter and page index create RowSelection {:?} from predicate: {:?}",
157+
&selectors,
158+
predicate.predicate_expr(),
155159
);
156160
row_selections.push_back(selectors.into_iter().flatten().collect::<Vec<_>>());
157161
}
@@ -322,6 +326,8 @@ fn prune_pages_in_one_row_group(
322326
let mut sum_row = *row_vec.first().unwrap();
323327
let mut selected = *values.first().unwrap();
324328

329+
trace!("Pruned to to {:?} using {:?}", values, pruning_stats);
330+
325331
for (i, &f) in values.iter().skip(1).enumerate() {
326332
if f == selected {
327333
sum_row += *row_vec.get(i).unwrap();
@@ -376,6 +382,7 @@ fn create_row_count_in_each_page(
376382

377383
/// Wraps one col page_index in one rowGroup statistics in a way
378384
/// that implements [`PruningStatistics`]
385+
#[derive(Debug)]
379386
struct PagesPruningStatistics<'a> {
380387
col_page_indexes: &'a Index,
381388
col_offset_indexes: &'a Vec<PageLocation>,

0 commit comments

Comments
 (0)