Skip to content

parquet reader: move pruning predicate creation from ParquetSource to ParquetOpener #15561

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Apr 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ pub(crate) mod test_util {
.into_iter()
.zip(tmp_files.into_iter())
.map(|(batch, mut output)| {
let builder = parquet::file::properties::WriterProperties::builder();
let props = if multi_page {
builder.set_data_page_row_count_limit(ROWS_PER_PAGE)
} else {
builder
let mut builder = parquet::file::properties::WriterProperties::builder();
if multi_page {
builder = builder.set_data_page_row_count_limit(ROWS_PER_PAGE)
}
.build();
builder = builder.set_bloom_filter_enabled(true);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I'm now using the actual explain plan to check that bloom filters are used it's easiest to just enable them and check that they pruned. It's also a more realistic test in that it's what I as a user would do to check if bloom filters are working.


let props = builder.build();

let mut writer = parquet::arrow::ArrowWriter::try_new(
&mut output,
Expand Down
282 changes: 109 additions & 173 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ mod tests {
};
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use arrow_schema::SchemaRef;
use bytes::{BufMut, BytesMut};
use datafusion_common::config::TableParquetOptions;
Expand All @@ -61,8 +62,9 @@ mod tests {
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::{col, lit, when, Expr};
use datafusion_physical_expr::planner::logical2physical;
use datafusion_physical_plan::analyze::AnalyzeExec;
use datafusion_physical_plan::collect;
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion_physical_plan::{collect, displayable};
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};

use chrono::{TimeZone, Utc};
Expand All @@ -81,10 +83,10 @@ mod tests {
struct RoundTripResult {
/// Data that was read back from ParquetFiles
batches: Result<Vec<RecordBatch>>,
/// The EXPLAIN ANALYZE output
explain: Result<String>,
/// The physical plan that was created (that has statistics, etc)
parquet_exec: Arc<DataSourceExec>,
/// The ParquetSource that is used in plan
parquet_source: ParquetSource,
}

/// round-trip record batches by writing each individual RecordBatch to
Expand Down Expand Up @@ -137,71 +139,109 @@ mod tests {
self.round_trip(batches).await.batches
}

/// run the test, returning the `RoundTripResult`
async fn round_trip(self, batches: Vec<RecordBatch>) -> RoundTripResult {
let Self {
projection,
schema,
predicate,
pushdown_predicate,
page_index_predicate,
} = self;

let file_schema = match schema {
Some(schema) => schema,
None => Arc::new(
Schema::try_merge(
batches.iter().map(|b| b.schema().as_ref().clone()),
)
.unwrap(),
),
};
// If testing with page_index_predicate, write parquet
// files with multiple pages
let multi_page = page_index_predicate;
let (meta, _files) = store_parquet(batches, multi_page).await.unwrap();
let file_group = meta.into_iter().map(Into::into).collect();

fn build_file_source(&self, file_schema: SchemaRef) -> Arc<ParquetSource> {
// set up predicate (this is normally done by a layer higher up)
let predicate = predicate.map(|p| logical2physical(&p, &file_schema));
let predicate = self
.predicate
.as_ref()
.map(|p| logical2physical(p, &file_schema));

let mut source = ParquetSource::default();
if let Some(predicate) = predicate {
source = source.with_predicate(Arc::clone(&file_schema), predicate);
}

if pushdown_predicate {
if self.pushdown_predicate {
source = source
.with_pushdown_filters(true)
.with_reorder_filters(true);
}

if page_index_predicate {
if self.page_index_predicate {
source = source.with_enable_page_index(true);
}

Arc::new(source)
}

fn build_parquet_exec(
&self,
file_schema: SchemaRef,
file_group: FileGroup,
source: Arc<ParquetSource>,
) -> Arc<DataSourceExec> {
let base_config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
file_schema,
Arc::new(source.clone()),
source,
)
.with_file_group(file_group)
.with_projection(projection)
.with_projection(self.projection.clone())
.build();
DataSourceExec::from_data_source(base_config)
}

/// run the test, returning the `RoundTripResult`
async fn round_trip(&self, batches: Vec<RecordBatch>) -> RoundTripResult {
let file_schema = match &self.schema {
Some(schema) => schema,
None => &Arc::new(
Schema::try_merge(
batches.iter().map(|b| b.schema().as_ref().clone()),
)
.unwrap(),
),
};
let file_schema = Arc::clone(file_schema);
// If testing with page_index_predicate, write parquet
// files with multiple pages
let multi_page = self.page_index_predicate;
let (meta, _files) = store_parquet(batches, multi_page).await.unwrap();
let file_group: FileGroup = meta.into_iter().map(Into::into).collect();

// build a ParquetExec to return the results
let parquet_source = self.build_file_source(file_schema.clone());
let parquet_exec = self.build_parquet_exec(
file_schema.clone(),
file_group.clone(),
Arc::clone(&parquet_source),
);

let analyze_exec = Arc::new(AnalyzeExec::new(
false,
false,
// use a new ParquetSource to avoid sharing execution metrics
self.build_parquet_exec(
file_schema.clone(),
file_group.clone(),
self.build_file_source(file_schema.clone()),
),
Arc::new(Schema::new(vec![
Field::new("plan_type", DataType::Utf8, true),
Field::new("plan", DataType::Utf8, true),
])),
));
Comment on lines +210 to +223
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I'm refactoring RoundTrip this is that I need to create the file source twice: ParquetSource has internal Metrics so if we run the query against the same ParquetSource twice (once for the data and once for the explain analyze plan) then we end up with duplicate metrics. Cloning it doesn't help / work because the metrics themselves are Arced.

I think generally asserting against the explain plan and not a handle to the ParquetSource is more in line with how real world users use DataFusion to debug if the page index, stats, etc. are working / pruning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this sounds good to me


let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();

let parquet_exec = DataSourceExec::from_data_source(base_config.clone());
let batches = collect(
Arc::clone(&parquet_exec) as Arc<dyn ExecutionPlan>,
task_ctx.clone(),
)
.await;

let explain = collect(analyze_exec, task_ctx.clone())
.await
.map(|batches| {
let batches = pretty_format_batches(&batches).unwrap();
format!("{batches}")
});

RoundTripResult {
batches: collect(parquet_exec.clone(), task_ctx).await,
batches,
explain,
parquet_exec,
parquet_source: base_config
.file_source()
.as_any()
.downcast_ref::<ParquetSource>()
.unwrap()
.clone(),
}
}
}
Expand Down Expand Up @@ -1375,26 +1415,6 @@ mod tests {
create_batch(vec![("c1", c1.clone())])
}

/// Returns a int64 array with contents:
/// "[-1, 1, null, 2, 3, null, null]"
fn int64_batch() -> RecordBatch {
let contents: ArrayRef = Arc::new(Int64Array::from(vec![
Some(-1),
Some(1),
None,
Some(2),
Some(3),
None,
None,
]));

create_batch(vec![
("a", contents.clone()),
("b", contents.clone()),
("c", contents.clone()),
])
}

#[tokio::test]
async fn parquet_exec_metrics() {
// batch1: c1(string)
Expand Down Expand Up @@ -1454,110 +1474,17 @@ mod tests {
.round_trip(vec![batch1])
.await;

// should have a pruning predicate
let pruning_predicate = rt.parquet_source.pruning_predicate();
assert!(pruning_predicate.is_some());

// convert to explain plan form
let display = displayable(rt.parquet_exec.as_ref())
.indent(true)
.to_string();

assert_contains!(
&display,
"pruning_predicate=c1_null_count@2 != row_count@3 AND (c1_min@0 != bar OR bar != c1_max@1)"
);

assert_contains!(&display, r#"predicate=c1@0 != bar"#);

assert_contains!(&display, "projection=[c1]");
}

#[tokio::test]
async fn parquet_exec_display_deterministic() {
Comment on lines -1476 to -1477
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I canned this whole test because it seems the the point is to check that required_guarantees= doesn't change run to run but that is no longer part of the output, so I don't see what this test would be testing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the guarantees were in a HashSet and printed to the explain plan so they could change from run to run

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right I guess the point is: now that they are no longer printed to the explain plan, is this test needed or can we bin it?

// batches: a(int64), b(int64), c(int64)
let batches = int64_batch();

fn extract_required_guarantees(s: &str) -> Option<&str> {
s.split("required_guarantees=").nth(1)
}

// Ensuring that the required_guarantees remain consistent across every display plan of the filter conditions
for _ in 0..100 {
// c = 1 AND b = 1 AND a = 1
let filter0 = col("c")
.eq(lit(1))
.and(col("b").eq(lit(1)))
.and(col("a").eq(lit(1)));

let rt0 = RoundTrip::new()
.with_predicate(filter0)
.with_pushdown_predicate()
.round_trip(vec![batches.clone()])
.await;

let pruning_predicate = rt0.parquet_source.pruning_predicate();
assert!(pruning_predicate.is_some());

let display0 = displayable(rt0.parquet_exec.as_ref())
.indent(true)
.to_string();

let guarantees0: &str = extract_required_guarantees(&display0)
.expect("Failed to extract required_guarantees");
// Compare only the required_guarantees part (Because the file_groups part will not be the same)
assert_eq!(
guarantees0.trim(),
"[a in (1), b in (1), c in (1)]",
"required_guarantees don't match"
);
}
let explain = rt.explain.unwrap();

// c = 1 AND a = 1 AND b = 1
let filter1 = col("c")
.eq(lit(1))
.and(col("a").eq(lit(1)))
.and(col("b").eq(lit(1)));
// check that there was a pruning predicate -> row groups got pruned
assert_contains!(&explain, "predicate=c1@0 != bar");

let rt1 = RoundTrip::new()
.with_predicate(filter1)
.with_pushdown_predicate()
.round_trip(vec![batches.clone()])
.await;

// b = 1 AND a = 1 AND c = 1
let filter2 = col("b")
.eq(lit(1))
.and(col("a").eq(lit(1)))
.and(col("c").eq(lit(1)));
// there's a single row group, but we can check that it matched
// if no pruning was done this would be 0 instead of 1
assert_contains!(&explain, "row_groups_matched_statistics=1");

let rt2 = RoundTrip::new()
.with_predicate(filter2)
.with_pushdown_predicate()
.round_trip(vec![batches])
.await;

// should have a pruning predicate
let pruning_predicate = rt1.parquet_source.pruning_predicate();
assert!(pruning_predicate.is_some());
let pruning_predicate = rt2.parquet_source.predicate();
assert!(pruning_predicate.is_some());

// convert to explain plan form
let display1 = displayable(rt1.parquet_exec.as_ref())
.indent(true)
.to_string();
let display2 = displayable(rt2.parquet_exec.as_ref())
.indent(true)
.to_string();

let guarantees1 = extract_required_guarantees(&display1)
.expect("Failed to extract required_guarantees");
let guarantees2 = extract_required_guarantees(&display2)
.expect("Failed to extract required_guarantees");

// Compare only the required_guarantees part (Because the predicate part will not be the same)
assert_eq!(guarantees1, guarantees2, "required_guarantees don't match");
// check the projection
assert_contains!(&explain, "projection=[c1]");
}

#[tokio::test]
Expand All @@ -1581,16 +1508,19 @@ mod tests {
.await;

// Should not contain a pruning predicate (since nothing can be pruned)
let pruning_predicate = rt.parquet_source.pruning_predicate();
assert!(
pruning_predicate.is_none(),
"Still had pruning predicate: {pruning_predicate:?}"
);
let explain = rt.explain.unwrap();

// but does still has a pushdown down predicate
let predicate = rt.parquet_source.predicate();
let filter_phys = logical2physical(&filter, rt.parquet_exec.schema().as_ref());
assert_eq!(predicate.unwrap().to_string(), filter_phys.to_string());
// When both matched and pruned are 0, it means that the pruning predicate
// was not used at all.
Comment on lines +1513 to +1514
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not super intuitive (I'd prefer it was null or none or the whole row_groups_matched_statistics was just not included. But I've left a comment to clarify for future readers.

assert_contains!(&explain, "row_groups_matched_statistics=0");
assert_contains!(&explain, "row_groups_pruned_statistics=0");

// But pushdown predicate should be present
assert_contains!(
&explain,
"predicate=CASE WHEN c1@0 != bar THEN true ELSE false END"
);
assert_contains!(&explain, "pushdown_rows_pruned=5");
}

#[tokio::test]
Expand All @@ -1616,8 +1546,14 @@ mod tests {
.await;

// Should have a pruning predicate
let pruning_predicate = rt.parquet_source.pruning_predicate();
assert!(pruning_predicate.is_some());
let explain = rt.explain.unwrap();
assert_contains!(
&explain,
"predicate=c1@0 = foo AND CASE WHEN c1@0 != bar THEN true ELSE false END"
);

// And bloom filters should have been evaluated
assert_contains!(&explain, "row_groups_pruned_bloom_filter=1");
}

/// Returns the sum of all the metrics with the specified name
Expand Down
Loading