-
Notifications
You must be signed in to change notification settings - Fork 1.5k
parquet reader: move pruning predicate creation from ParquetSource to ParquetOpener #15561
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
718bed3
4655adc
a30b631
398e7a4
05bdcf8
9a38aec
f886645
2d486d5
bc9af12
3fd4b8a
c193d2e
cd6d766
ad627d8
34993f2
c9aaa3d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,6 +43,7 @@ mod tests { | |
}; | ||
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder}; | ||
use arrow::record_batch::RecordBatch; | ||
use arrow::util::pretty::pretty_format_batches; | ||
use arrow_schema::SchemaRef; | ||
use bytes::{BufMut, BytesMut}; | ||
use datafusion_common::config::TableParquetOptions; | ||
|
@@ -61,8 +62,9 @@ mod tests { | |
use datafusion_execution::object_store::ObjectStoreUrl; | ||
use datafusion_expr::{col, lit, when, Expr}; | ||
use datafusion_physical_expr::planner::logical2physical; | ||
use datafusion_physical_plan::analyze::AnalyzeExec; | ||
use datafusion_physical_plan::collect; | ||
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; | ||
use datafusion_physical_plan::{collect, displayable}; | ||
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; | ||
|
||
use chrono::{TimeZone, Utc}; | ||
|
@@ -81,10 +83,10 @@ mod tests { | |
struct RoundTripResult { | ||
/// Data that was read back from ParquetFiles | ||
batches: Result<Vec<RecordBatch>>, | ||
/// The EXPLAIN ANALYZE output | ||
explain: Result<String>, | ||
/// The physical plan that was created (that has statistics, etc) | ||
parquet_exec: Arc<DataSourceExec>, | ||
/// The ParquetSource that is used in plan | ||
parquet_source: ParquetSource, | ||
} | ||
|
||
/// round-trip record batches by writing each individual RecordBatch to | ||
|
@@ -137,71 +139,109 @@ mod tests { | |
self.round_trip(batches).await.batches | ||
} | ||
|
||
/// run the test, returning the `RoundTripResult` | ||
async fn round_trip(self, batches: Vec<RecordBatch>) -> RoundTripResult { | ||
let Self { | ||
projection, | ||
schema, | ||
predicate, | ||
pushdown_predicate, | ||
page_index_predicate, | ||
} = self; | ||
|
||
let file_schema = match schema { | ||
Some(schema) => schema, | ||
None => Arc::new( | ||
Schema::try_merge( | ||
batches.iter().map(|b| b.schema().as_ref().clone()), | ||
) | ||
.unwrap(), | ||
), | ||
}; | ||
// If testing with page_index_predicate, write parquet | ||
// files with multiple pages | ||
let multi_page = page_index_predicate; | ||
let (meta, _files) = store_parquet(batches, multi_page).await.unwrap(); | ||
let file_group = meta.into_iter().map(Into::into).collect(); | ||
|
||
fn build_file_source(&self, file_schema: SchemaRef) -> Arc<ParquetSource> { | ||
// set up predicate (this is normally done by a layer higher up) | ||
let predicate = predicate.map(|p| logical2physical(&p, &file_schema)); | ||
let predicate = self | ||
.predicate | ||
.as_ref() | ||
.map(|p| logical2physical(p, &file_schema)); | ||
|
||
let mut source = ParquetSource::default(); | ||
if let Some(predicate) = predicate { | ||
source = source.with_predicate(Arc::clone(&file_schema), predicate); | ||
} | ||
|
||
if pushdown_predicate { | ||
if self.pushdown_predicate { | ||
source = source | ||
.with_pushdown_filters(true) | ||
.with_reorder_filters(true); | ||
} | ||
|
||
if page_index_predicate { | ||
if self.page_index_predicate { | ||
source = source.with_enable_page_index(true); | ||
} | ||
|
||
Arc::new(source) | ||
} | ||
|
||
fn build_parquet_exec( | ||
&self, | ||
file_schema: SchemaRef, | ||
file_group: FileGroup, | ||
source: Arc<ParquetSource>, | ||
) -> Arc<DataSourceExec> { | ||
let base_config = FileScanConfigBuilder::new( | ||
ObjectStoreUrl::local_filesystem(), | ||
file_schema, | ||
Arc::new(source.clone()), | ||
source, | ||
) | ||
.with_file_group(file_group) | ||
.with_projection(projection) | ||
.with_projection(self.projection.clone()) | ||
.build(); | ||
DataSourceExec::from_data_source(base_config) | ||
} | ||
|
||
/// run the test, returning the `RoundTripResult` | ||
async fn round_trip(&self, batches: Vec<RecordBatch>) -> RoundTripResult { | ||
let file_schema = match &self.schema { | ||
Some(schema) => schema, | ||
None => &Arc::new( | ||
Schema::try_merge( | ||
batches.iter().map(|b| b.schema().as_ref().clone()), | ||
) | ||
.unwrap(), | ||
), | ||
}; | ||
let file_schema = Arc::clone(file_schema); | ||
// If testing with page_index_predicate, write parquet | ||
// files with multiple pages | ||
let multi_page = self.page_index_predicate; | ||
let (meta, _files) = store_parquet(batches, multi_page).await.unwrap(); | ||
let file_group: FileGroup = meta.into_iter().map(Into::into).collect(); | ||
|
||
// build a ParquetExec to return the results | ||
let parquet_source = self.build_file_source(file_schema.clone()); | ||
let parquet_exec = self.build_parquet_exec( | ||
file_schema.clone(), | ||
file_group.clone(), | ||
Arc::clone(&parquet_source), | ||
); | ||
|
||
let analyze_exec = Arc::new(AnalyzeExec::new( | ||
false, | ||
false, | ||
// use a new ParquetSource to avoid sharing execution metrics | ||
self.build_parquet_exec( | ||
file_schema.clone(), | ||
file_group.clone(), | ||
self.build_file_source(file_schema.clone()), | ||
), | ||
Arc::new(Schema::new(vec![ | ||
Field::new("plan_type", DataType::Utf8, true), | ||
Field::new("plan", DataType::Utf8, true), | ||
])), | ||
)); | ||
Comment on lines
+210
to
+223
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason I'm refactoring I think generally asserting against the explain plan and not a handle to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree this sounds good to me |
||
|
||
let session_ctx = SessionContext::new(); | ||
let task_ctx = session_ctx.task_ctx(); | ||
|
||
let parquet_exec = DataSourceExec::from_data_source(base_config.clone()); | ||
let batches = collect( | ||
Arc::clone(&parquet_exec) as Arc<dyn ExecutionPlan>, | ||
task_ctx.clone(), | ||
) | ||
.await; | ||
|
||
let explain = collect(analyze_exec, task_ctx.clone()) | ||
.await | ||
.map(|batches| { | ||
let batches = pretty_format_batches(&batches).unwrap(); | ||
format!("{batches}") | ||
}); | ||
|
||
RoundTripResult { | ||
batches: collect(parquet_exec.clone(), task_ctx).await, | ||
batches, | ||
explain, | ||
parquet_exec, | ||
parquet_source: base_config | ||
.file_source() | ||
.as_any() | ||
.downcast_ref::<ParquetSource>() | ||
.unwrap() | ||
.clone(), | ||
} | ||
} | ||
} | ||
|
@@ -1375,26 +1415,6 @@ mod tests { | |
create_batch(vec![("c1", c1.clone())]) | ||
} | ||
|
||
/// Returns a int64 array with contents: | ||
/// "[-1, 1, null, 2, 3, null, null]" | ||
fn int64_batch() -> RecordBatch { | ||
let contents: ArrayRef = Arc::new(Int64Array::from(vec![ | ||
Some(-1), | ||
Some(1), | ||
None, | ||
Some(2), | ||
Some(3), | ||
None, | ||
None, | ||
])); | ||
|
||
create_batch(vec![ | ||
("a", contents.clone()), | ||
("b", contents.clone()), | ||
("c", contents.clone()), | ||
]) | ||
} | ||
|
||
#[tokio::test] | ||
async fn parquet_exec_metrics() { | ||
// batch1: c1(string) | ||
|
@@ -1454,110 +1474,17 @@ mod tests { | |
.round_trip(vec![batch1]) | ||
.await; | ||
|
||
// should have a pruning predicate | ||
let pruning_predicate = rt.parquet_source.pruning_predicate(); | ||
assert!(pruning_predicate.is_some()); | ||
|
||
// convert to explain plan form | ||
let display = displayable(rt.parquet_exec.as_ref()) | ||
.indent(true) | ||
.to_string(); | ||
|
||
assert_contains!( | ||
&display, | ||
"pruning_predicate=c1_null_count@2 != row_count@3 AND (c1_min@0 != bar OR bar != c1_max@1)" | ||
); | ||
|
||
assert_contains!(&display, r#"predicate=c1@0 != bar"#); | ||
|
||
assert_contains!(&display, "projection=[c1]"); | ||
} | ||
|
||
#[tokio::test] | ||
async fn parquet_exec_display_deterministic() { | ||
Comment on lines
-1476
to
-1477
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I canned this whole test because it seems the the point is to check that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the guarantees were in a HashSet and printed to the explain plan so they could change from run to run There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right I guess the point is: now that they are no longer printed to the explain plan, is this test needed or can we bin it? |
||
// batches: a(int64), b(int64), c(int64) | ||
let batches = int64_batch(); | ||
|
||
fn extract_required_guarantees(s: &str) -> Option<&str> { | ||
s.split("required_guarantees=").nth(1) | ||
} | ||
|
||
// Ensuring that the required_guarantees remain consistent across every display plan of the filter conditions | ||
for _ in 0..100 { | ||
// c = 1 AND b = 1 AND a = 1 | ||
let filter0 = col("c") | ||
.eq(lit(1)) | ||
.and(col("b").eq(lit(1))) | ||
.and(col("a").eq(lit(1))); | ||
|
||
let rt0 = RoundTrip::new() | ||
.with_predicate(filter0) | ||
.with_pushdown_predicate() | ||
.round_trip(vec![batches.clone()]) | ||
.await; | ||
|
||
let pruning_predicate = rt0.parquet_source.pruning_predicate(); | ||
assert!(pruning_predicate.is_some()); | ||
|
||
let display0 = displayable(rt0.parquet_exec.as_ref()) | ||
.indent(true) | ||
.to_string(); | ||
|
||
let guarantees0: &str = extract_required_guarantees(&display0) | ||
.expect("Failed to extract required_guarantees"); | ||
// Compare only the required_guarantees part (Because the file_groups part will not be the same) | ||
assert_eq!( | ||
guarantees0.trim(), | ||
"[a in (1), b in (1), c in (1)]", | ||
"required_guarantees don't match" | ||
); | ||
} | ||
let explain = rt.explain.unwrap(); | ||
|
||
// c = 1 AND a = 1 AND b = 1 | ||
let filter1 = col("c") | ||
.eq(lit(1)) | ||
.and(col("a").eq(lit(1))) | ||
.and(col("b").eq(lit(1))); | ||
// check that there was a pruning predicate -> row groups got pruned | ||
assert_contains!(&explain, "predicate=c1@0 != bar"); | ||
|
||
let rt1 = RoundTrip::new() | ||
.with_predicate(filter1) | ||
.with_pushdown_predicate() | ||
.round_trip(vec![batches.clone()]) | ||
.await; | ||
|
||
// b = 1 AND a = 1 AND c = 1 | ||
let filter2 = col("b") | ||
.eq(lit(1)) | ||
.and(col("a").eq(lit(1))) | ||
.and(col("c").eq(lit(1))); | ||
// there's a single row group, but we can check that it matched | ||
// if no pruning was done this would be 0 instead of 1 | ||
assert_contains!(&explain, "row_groups_matched_statistics=1"); | ||
|
||
let rt2 = RoundTrip::new() | ||
.with_predicate(filter2) | ||
.with_pushdown_predicate() | ||
.round_trip(vec![batches]) | ||
.await; | ||
|
||
// should have a pruning predicate | ||
let pruning_predicate = rt1.parquet_source.pruning_predicate(); | ||
assert!(pruning_predicate.is_some()); | ||
let pruning_predicate = rt2.parquet_source.predicate(); | ||
assert!(pruning_predicate.is_some()); | ||
|
||
// convert to explain plan form | ||
let display1 = displayable(rt1.parquet_exec.as_ref()) | ||
.indent(true) | ||
.to_string(); | ||
let display2 = displayable(rt2.parquet_exec.as_ref()) | ||
.indent(true) | ||
.to_string(); | ||
|
||
let guarantees1 = extract_required_guarantees(&display1) | ||
.expect("Failed to extract required_guarantees"); | ||
let guarantees2 = extract_required_guarantees(&display2) | ||
.expect("Failed to extract required_guarantees"); | ||
|
||
// Compare only the required_guarantees part (Because the predicate part will not be the same) | ||
assert_eq!(guarantees1, guarantees2, "required_guarantees don't match"); | ||
// check the projection | ||
assert_contains!(&explain, "projection=[c1]"); | ||
} | ||
|
||
#[tokio::test] | ||
|
@@ -1581,16 +1508,19 @@ mod tests { | |
.await; | ||
|
||
// Should not contain a pruning predicate (since nothing can be pruned) | ||
let pruning_predicate = rt.parquet_source.pruning_predicate(); | ||
assert!( | ||
pruning_predicate.is_none(), | ||
"Still had pruning predicate: {pruning_predicate:?}" | ||
); | ||
let explain = rt.explain.unwrap(); | ||
|
||
// but does still has a pushdown down predicate | ||
let predicate = rt.parquet_source.predicate(); | ||
let filter_phys = logical2physical(&filter, rt.parquet_exec.schema().as_ref()); | ||
assert_eq!(predicate.unwrap().to_string(), filter_phys.to_string()); | ||
// When both matched and pruned are 0, it means that the pruning predicate | ||
// was not used at all. | ||
Comment on lines
+1513
to
+1514
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not super intuitive (I'd prefer it was |
||
assert_contains!(&explain, "row_groups_matched_statistics=0"); | ||
assert_contains!(&explain, "row_groups_pruned_statistics=0"); | ||
|
||
// But pushdown predicate should be present | ||
assert_contains!( | ||
&explain, | ||
"predicate=CASE WHEN c1@0 != bar THEN true ELSE false END" | ||
); | ||
assert_contains!(&explain, "pushdown_rows_pruned=5"); | ||
} | ||
|
||
#[tokio::test] | ||
|
@@ -1616,8 +1546,14 @@ mod tests { | |
.await; | ||
|
||
// Should have a pruning predicate | ||
let pruning_predicate = rt.parquet_source.pruning_predicate(); | ||
assert!(pruning_predicate.is_some()); | ||
let explain = rt.explain.unwrap(); | ||
assert_contains!( | ||
&explain, | ||
"predicate=c1@0 = foo AND CASE WHEN c1@0 != bar THEN true ELSE false END" | ||
); | ||
|
||
// And bloom filters should have been evaluated | ||
assert_contains!(&explain, "row_groups_pruned_bloom_filter=1"); | ||
} | ||
|
||
/// Returns the sum of all the metrics with the specified name | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since I'm now using the actual explain plan to check that bloom filters are used it's easiest to just enable them and check that they pruned. It's also a more realistic test in that it's what I as a user would do to check if bloom filters are working.