Skip to content

Commit

Permalink
[parquet] Avoid read parquet index when there is no filter pushdown. (#…
Browse files Browse the repository at this point in the history
…6317)

* [parquet] Avoid read parquet index when there is no filter pushdown.

* fix clippy

* fix fmt

* fix fmt2
  • Loading branch information
Ted-Jiang authored May 11, 2023
1 parent 1921ee6 commit a07d6eb
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 1 deletion.
17 changes: 16 additions & 1 deletion datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,10 @@ impl FileOpener for ParquetOpener {
let table_schema = self.table_schema.clone();
let reorder_predicates = self.reorder_filters;
let pushdown_filters = self.pushdown_filters;
let enable_page_index = self.enable_page_index;
let enable_page_index = should_enable_page_index(
self.enable_page_index,
&self.page_pruning_predicate,
);
let limit = self.limit;

Ok(Box::pin(async move {
Expand Down Expand Up @@ -572,6 +575,18 @@ impl FileOpener for ParquetOpener {
}
}

fn should_enable_page_index(
enable_page_index: bool,
page_pruning_predicate: &Option<Arc<PagePruningPredicate>>,
) -> bool {
enable_page_index
&& page_pruning_predicate.is_some()
&& page_pruning_predicate
.as_ref()
.map(|p| p.filter_number() > 0)
.unwrap_or(false)
}

/// Factory of parquet file readers.
///
/// Provides means to implement custom data access interface.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ impl PagePruningPredicate {
file_metrics.page_index_rows_filtered.add(total_skip);
Ok(Some(final_selection))
}

/// Returns the number of filters in the [`PagePruningPredicate`]
pub fn filter_number(&self) -> usize {
self.predicates.len()
}
}

/// Returns the column index in the row group metadata for the single
Expand Down
39 changes: 39 additions & 0 deletions datafusion/core/tests/parquet/page_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::execution::context::SessionState;
use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
use datafusion::physical_plan::metrics::MetricValue;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use datafusion_common::{ScalarValue, Statistics, ToDFSchema};
Expand Down Expand Up @@ -714,3 +715,41 @@ async fn prune_decimal_in_list() {
)
.await;
}

#[tokio::test]
async fn without_pushdown_filter() {
let mut context = ContextWithParquet::new(Scenario::Timestamps, Page).await;

let output1 = context.query("SELECT * FROM t").await;

let mut context = ContextWithParquet::new(Scenario::Timestamps, Page).await;

let output2 = context
.query("SELECT * FROM t where nanos < to_timestamp('2023-01-02 01:01:11Z')")
.await;

let bytes_scanned_without_filter = cast_count_metric(
output1
.parquet_metrics
.sum_by_name("bytes_scanned")
.unwrap(),
)
.unwrap();
let bytes_scanned_with_filter = cast_count_metric(
output2
.parquet_metrics
.sum_by_name("bytes_scanned")
.unwrap(),
)
.unwrap();

// Without filter will not read pageIndex.
assert!(bytes_scanned_with_filter > bytes_scanned_without_filter);
}

fn cast_count_metric(metric: MetricValue) -> Option<usize> {
match metric {
MetricValue::Count { count, .. } => Some(count.value()),
_ => None,
}
}

0 comments on commit a07d6eb

Please sign in to comment.