Skip to content

Commit 1c97dc4

Browse files
committed
Add statistics for parquet page level skipping
Signed-off-by: yangjiang <yangjiang@ebay.com>
1 parent f61b43a commit 1c97dc4

File tree

4 files changed

+119
-29
lines changed

4 files changed

+119
-29
lines changed

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -532,25 +532,45 @@ pub(crate) mod test_util {
532532

533533
pub async fn store_parquet(
534534
batches: Vec<RecordBatch>,
535+
multi_page: bool,
535536
) -> Result<(Vec<ObjectMeta>, Vec<NamedTempFile>)> {
536-
let files: Vec<_> = batches
537-
.into_iter()
538-
.map(|batch| {
539-
let mut output = NamedTempFile::new().expect("creating temp file");
540-
541-
let props = WriterProperties::builder().build();
542-
let mut writer =
543-
ArrowWriter::try_new(&mut output, batch.schema(), Some(props))
544-
.expect("creating writer");
545-
546-
writer.write(&batch).expect("Writing batch");
547-
writer.close().unwrap();
548-
output
549-
})
550-
.collect();
551-
552-
let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect();
553-
Ok((meta, files))
537+
if multi_page {
538+
// All batches write in to one file, each batch must have same schema.
539+
let mut output = NamedTempFile::new().expect("creating temp file");
540+
let mut builder = WriterProperties::builder();
541+
// todo https://github.com/apache/arrow-rs/issues/2941 release change to row limit.
542+
builder = builder.set_data_pagesize_limit(1);
543+
builder = builder.set_write_batch_size(1);
544+
let proper = builder.build();
545+
let mut writer =
546+
ArrowWriter::try_new(&mut output, batches[0].schema(), Some(proper))
547+
.expect("creating writer");
548+
for b in batches {
549+
writer.write(&b).expect("Writing batch");
550+
}
551+
writer.close().unwrap();
552+
Ok((vec![local_unpartitioned_file(&output)], vec![output]))
553+
} else {
554+
// Each batch writes to their own file
555+
let files: Vec<_> = batches
556+
.into_iter()
557+
.map(|batch| {
558+
let mut output = NamedTempFile::new().expect("creating temp file");
559+
560+
let props = WriterProperties::builder().build();
561+
let mut writer =
562+
ArrowWriter::try_new(&mut output, batch.schema(), Some(props))
563+
.expect("creating writer");
564+
565+
writer.write(&batch).expect("Writing batch");
566+
writer.close().unwrap();
567+
output
568+
})
569+
.collect();
570+
571+
let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect();
572+
Ok((meta, files))
573+
}
554574
}
555575
}
556576

@@ -599,7 +619,7 @@ mod tests {
599619
let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap();
600620

601621
let store = Arc::new(LocalFileSystem::new()) as _;
602-
let (meta, _files) = store_parquet(vec![batch1, batch2]).await?;
622+
let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;
603623

604624
let format = ParquetFormat::default();
605625
let schema = format.infer_schema(&store, &meta).await.unwrap();
@@ -738,7 +758,7 @@ mod tests {
738758
let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
739759
LocalFileSystem::new(),
740760
)));
741-
let (meta, _files) = store_parquet(vec![batch1, batch2]).await?;
761+
let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;
742762

743763
// Use a size hint larger than the parquet footer but smaller than the actual metadata, requiring a second fetch
744764
// for the remaining metadata

datafusion/core/src/physical_plan/file_format/parquet.rs

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ impl FileOpener for ParquetOpener {
449449
// page index pruning: if all data on individual pages can
450450
// be ruled using page metadata, rows from other columns
451451
// with that range can be skipped as well
452-
if let Some(row_selection) = enable_page_index
452+
if let Some(row_selection) = (enable_page_index && !row_groups.is_empty())
453453
.then(|| {
454454
page_filter::build_page_filter(
455455
pruning_predicate.as_ref(),
@@ -919,7 +919,7 @@ mod tests {
919919
datasource::file_format::{parquet::ParquetFormat, FileFormat},
920920
physical_plan::collect,
921921
};
922-
use arrow::array::Float32Array;
922+
use arrow::array::{Float32Array, Int32Array};
923923
use arrow::datatypes::DataType::Decimal128;
924924
use arrow::record_batch::RecordBatch;
925925
use arrow::{
@@ -960,9 +960,16 @@ mod tests {
960960
predicate: Option<Expr>,
961961
pushdown_predicate: bool,
962962
) -> Result<Vec<RecordBatch>> {
963-
round_trip(batches, projection, schema, predicate, pushdown_predicate)
964-
.await
965-
.batches
963+
round_trip(
964+
batches,
965+
projection,
966+
schema,
967+
predicate,
968+
pushdown_predicate,
969+
false,
970+
)
971+
.await
972+
.batches
966973
}
967974

968975
/// Writes each RecordBatch as an individual parquet file and then
@@ -974,6 +981,7 @@ mod tests {
974981
schema: Option<SchemaRef>,
975982
predicate: Option<Expr>,
976983
pushdown_predicate: bool,
984+
page_index_predicate: bool,
977985
) -> RoundTripResult {
978986
let file_schema = match schema {
979987
Some(schema) => schema,
@@ -983,7 +991,7 @@ mod tests {
983991
),
984992
};
985993

986-
let (meta, _files) = store_parquet(batches).await.unwrap();
994+
let (meta, _files) = store_parquet(batches, page_index_predicate).await.unwrap();
987995
let file_groups = meta.into_iter().map(Into::into).collect();
988996

989997
// prepare the scan
@@ -1008,6 +1016,10 @@ mod tests {
10081016
.with_reorder_filters(true);
10091017
}
10101018

1019+
if page_index_predicate {
1020+
parquet_exec = parquet_exec.with_enable_page_index(true);
1021+
}
1022+
10111023
let session_ctx = SessionContext::new();
10121024
let task_ctx = session_ctx.task_ctx();
10131025
let parquet_exec = Arc::new(parquet_exec);
@@ -1225,7 +1237,8 @@ mod tests {
12251237
let filter = col("c2").eq(lit(2_i64));
12261238

12271239
// read/write them files:
1228-
let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await;
1240+
let rt =
1241+
round_trip(vec![batch1, batch2], None, None, Some(filter), true, false).await;
12291242
let expected = vec![
12301243
"+----+----+----+",
12311244
"| c1 | c3 | c2 |",
@@ -1374,7 +1387,8 @@ mod tests {
13741387
let filter = col("c2").eq(lit(1_i64));
13751388

13761389
// read/write them files:
1377-
let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await;
1390+
let rt =
1391+
round_trip(vec![batch1, batch2], None, None, Some(filter), true, false).await;
13781392

13791393
let expected = vec![
13801394
"+----+----+",
@@ -1695,6 +1709,35 @@ mod tests {
16951709
Ok(())
16961710
}
16971711

1712+
#[tokio::test]
1713+
async fn parquet_page_index_exec_metrics() {
1714+
let c1: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(2)]));
1715+
let c2: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(4), Some(5)]));
1716+
let batch1 = create_batch(vec![("int", c1.clone())]);
1717+
let batch2 = create_batch(vec![("int", c2.clone())]);
1718+
1719+
let filter = col("int").eq(lit(4_i32));
1720+
1721+
let rt =
1722+
round_trip(vec![batch1, batch2], None, None, Some(filter), false, true).await;
1723+
1724+
let metrics = rt.parquet_exec.metrics().unwrap();
1725+
1726+
// todo fix this https://github.com/apache/arrow-rs/issues/2941 release change to row limit.
1727+
// assert the batches and some metrics
1728+
let expected = vec![
1729+
"+-----+", "| int |", "+-----+", "| |", "| 1 |", "| 2 |", "| 3 |",
1730+
"| 4 |", "| 5 |", "+-----+",
1731+
];
1732+
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
1733+
assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 0);
1734+
assert!(
1735+
get_value(&metrics, "page_index_eval_time") > 0,
1736+
"no eval time in metrics: {:#?}",
1737+
metrics
1738+
);
1739+
}
1740+
16981741
#[tokio::test]
16991742
async fn parquet_exec_metrics() {
17001743
let c1: ArrayRef = Arc::new(StringArray::from(vec![
@@ -1714,7 +1757,7 @@ mod tests {
17141757
let filter = col("c1").not_eq(lit("bar"));
17151758

17161759
// read/write them files:
1717-
let rt = round_trip(vec![batch1], None, None, Some(filter), true).await;
1760+
let rt = round_trip(vec![batch1], None, None, Some(filter), true, false).await;
17181761

17191762
let metrics = rt.parquet_exec.metrics().unwrap();
17201763

datafusion/core/src/physical_plan/file_format/parquet/metrics.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ pub struct ParquetFileMetrics {
3535
pub pushdown_rows_filtered: Count,
3636
/// Total time spent evaluating pushdown filters
3737
pub pushdown_eval_time: Time,
38+
/// Total rows filtered out by parquet page index
39+
pub page_index_rows_filtered: Count,
40+
/// Total time spent evaluating parquet page index filters
41+
pub page_index_eval_time: Time,
3842
}
3943

4044
impl ParquetFileMetrics {
@@ -63,13 +67,22 @@ impl ParquetFileMetrics {
6367
let pushdown_eval_time = MetricBuilder::new(metrics)
6468
.with_new_label("filename", filename.to_string())
6569
.subset_time("pushdown_eval_time", partition);
70+
let page_index_rows_filtered = MetricBuilder::new(metrics)
71+
.with_new_label("filename", filename.to_string())
72+
.counter("page_index_rows_filtered", partition);
73+
74+
let page_index_eval_time = MetricBuilder::new(metrics)
75+
.with_new_label("filename", filename.to_string())
76+
.subset_time("page_index_eval_time", partition);
6677

6778
Self {
6879
predicate_evaluation_errors,
6980
row_groups_pruned,
7081
bytes_scanned,
7182
pushdown_rows_filtered,
7283
pushdown_eval_time,
84+
page_index_rows_filtered,
85+
page_index_eval_time,
7386
}
7487
}
7588
}

datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ pub(crate) fn build_page_filter(
100100
file_metadata: &ParquetMetaData,
101101
file_metrics: &ParquetFileMetrics,
102102
) -> Result<Option<RowSelection>> {
103+
// scoped timer updates on drop
104+
let _timer_guard = file_metrics.page_index_eval_time.timer();
103105
let page_index_predicates =
104106
extract_page_index_push_down_predicates(pruning_predicate, schema)?;
105107

@@ -154,6 +156,18 @@ pub(crate) fn build_page_filter(
154156
row_selections.push_back(selectors.into_iter().flatten().collect::<Vec<_>>());
155157
}
156158
let final_selection = combine_multi_col_selection(row_selections);
159+
let total_skip =
160+
final_selection.iter().fold(
161+
0,
162+
|acc, x| {
163+
if x.skip {
164+
acc + x.row_count
165+
} else {
166+
acc
167+
}
168+
},
169+
);
170+
file_metrics.page_index_rows_filtered.add(total_skip);
157171
Ok(Some(final_selection.into()))
158172
} else {
159173
Ok(None)

0 commit comments

Comments
 (0)