From a07d6ebc9f31fb65bff4d5c66536ed3485d6db78 Mon Sep 17 00:00:00 2001 From: Yang Jiang Date: Thu, 11 May 2023 19:14:30 +0800 Subject: [PATCH] [parquet] Avoid read parquet index when there is no filter pushdown. (#6317) * [parquet] Avoid read parquet index when there is no filter pushdown. * fix clippy * fix fmt * fix fmt2 --- .../src/physical_plan/file_format/parquet.rs | 17 +++++++- .../file_format/parquet/page_filter.rs | 5 +++ datafusion/core/tests/parquet/page_pruning.rs | 39 +++++++++++++++++++ 3 files changed, 60 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 641be002b6ad..9843ecc22e4c 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -482,7 +482,10 @@ impl FileOpener for ParquetOpener { let table_schema = self.table_schema.clone(); let reorder_predicates = self.reorder_filters; let pushdown_filters = self.pushdown_filters; - let enable_page_index = self.enable_page_index; + let enable_page_index = should_enable_page_index( + self.enable_page_index, + &self.page_pruning_predicate, + ); let limit = self.limit; Ok(Box::pin(async move { @@ -572,6 +575,18 @@ impl FileOpener for ParquetOpener { } } +fn should_enable_page_index( + enable_page_index: bool, + page_pruning_predicate: &Option>, +) -> bool { + enable_page_index + && page_pruning_predicate.is_some() + && page_pruning_predicate + .as_ref() + .map(|p| p.filter_number() > 0) + .unwrap_or(false) +} + /// Factory of parquet file readers. /// /// Provides means to implement custom data access interface. diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs index 410c43c57d71..00e55c41ad09 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs @@ -222,6 +222,11 @@ impl PagePruningPredicate { file_metrics.page_index_rows_filtered.add(total_skip); Ok(Some(final_selection)) } + + /// Returns the number of filters in the [`PagePruningPredicate`] + pub fn filter_number(&self) -> usize { + self.predicates.len() + } } /// Returns the column index in the row group metadata for the single diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index baf9d2d36a17..1d444326bbc4 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -23,6 +23,7 @@ use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::execution::context::SessionState; use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec}; +use datafusion::physical_plan::metrics::MetricValue; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; use datafusion_common::{ScalarValue, Statistics, ToDFSchema}; @@ -714,3 +715,41 @@ async fn prune_decimal_in_list() { ) .await; } + +#[tokio::test] +async fn without_pushdown_filter() { + let mut context = ContextWithParquet::new(Scenario::Timestamps, Page).await; + + let output1 = context.query("SELECT * FROM t").await; + + let mut context = ContextWithParquet::new(Scenario::Timestamps, Page).await; + + let output2 = context + .query("SELECT * FROM t where nanos < to_timestamp('2023-01-02 01:01:11Z')") + .await; + + let bytes_scanned_without_filter = cast_count_metric( + output1 + .parquet_metrics + .sum_by_name("bytes_scanned") + .unwrap(), + ) + .unwrap(); + let bytes_scanned_with_filter = cast_count_metric( + output2 + .parquet_metrics + .sum_by_name("bytes_scanned") + .unwrap(), + ) + .unwrap(); + + // Without filter will not read pageIndex. + assert!(bytes_scanned_with_filter > bytes_scanned_without_filter); +} + +fn cast_count_metric(metric: MetricValue) -> Option { + match metric { + MetricValue::Count { count, .. } => Some(count.value()), + _ => None, + } +}