Skip to content

pipe column orderings into pruning predicate creation #15821

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
6be6ff7
pipe column orderings into pruning predicate creation
adriangb Apr 23, 2025
8c2ceb1
touch up
adriangb Apr 23, 2025
6bfaeb1
Merge remote-tracking branch 'origin/main' into float_pruning_no_total
etseidl Apr 24, 2025
849f18f
start adding rules for floats
etseidl Apr 24, 2025
dada2ce
cleanup and add more comments
etseidl Apr 24, 2025
4c82fbf
more cleanup
etseidl Apr 25, 2025
347fad3
fix some compile problems
etseidl Apr 25, 2025
de60fb4
fix examples
etseidl Apr 26, 2025
de81a92
rework column_index_for_expr
etseidl Apr 28, 2025
da34493
check for NegativeExpr too
etseidl Apr 28, 2025
9726505
modify float tests that will no longer be pruned
etseidl Apr 28, 2025
081920a
fix comment
etseidl Apr 28, 2025
2d4f979
Merge remote-tracking branch 'origin/main' into column-orders-parquet…
etseidl May 19, 2025
4303f11
revert doc changes for now
etseidl May 19, 2025
c43000a
fix new call to build_pruning_predicates
etseidl May 19, 2025
4392191
clippy
etseidl May 20, 2025
56893fc
Merge branch 'main' into column-orders-parquet-stats
etseidl May 20, 2025
5c781f8
add some tests
etseidl May 20, 2025
56eb96a
Merge remote-tracking branch 'origin/main' into column-orders-parquet…
etseidl May 20, 2025
d74b8ac
Merge remote-tracking branch 'origin/main' into column-orders-parquet…
etseidl May 21, 2025
713214c
fix for logical schema
etseidl May 21, 2025
c546d95
Merge branch 'main' into column-orders-parquet-stats
etseidl May 21, 2025
63ddf1b
Merge branch 'main' into column-orders-parquet-stats
etseidl May 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use datafusion::parquet::file::properties::{EnabledStatistics, WriterProperties}
use datafusion::parquet::schema::types::ColumnPath;
use datafusion::physical_expr::utils::{Guarantee, LiteralGuarantee};
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_optimizer::pruning::{ColumnOrdering, PruningPredicate};
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
Expand Down Expand Up @@ -300,8 +300,11 @@ impl IndexTableProvider {
// In this example, we use the PruningPredicate's literal guarantees to
// analyze the predicate. In a real system, using
// `PruningPredicate::prune` would likely be easier to do.
let pruning_predicate =
PruningPredicate::try_new(Arc::clone(predicate), self.schema())?;
let pruning_predicate = PruningPredicate::try_new(
Arc::clone(predicate),
self.schema(),
vec![ColumnOrdering::Unknown; self.schema().fields().len()],
)?;
Comment on lines +303 to +307
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add a new signature to avoid API churn, but I wanted to make it explicit for now to see all of the callsites


// The PruningPredicate's guarantees must all be satisfied in order for
// the predicate to possibly evaluate to true.
Expand Down
9 changes: 6 additions & 3 deletions datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use datafusion::parquet::arrow::{
arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter,
};
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_optimizer::pruning::{ColumnOrdering, PruningPredicate};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
use std::any::Any;
Expand Down Expand Up @@ -366,8 +366,11 @@ impl ParquetMetadataIndex {
) -> Result<Vec<(&str, u64)>> {
// Use the PruningPredicate API to determine which files can not
// possibly have any relevant data.
let pruning_predicate =
PruningPredicate::try_new(predicate, self.schema().clone())?;
let pruning_predicate = PruningPredicate::try_new(
predicate,
self.schema().clone(),
vec![ColumnOrdering::Unknown; self.schema().fields().len()],
)?;

// Now evaluate the pruning predicate into a boolean mask, one element per
// file in the index. If the mask is true, the file may have rows that
Expand Down
9 changes: 7 additions & 2 deletions datafusion-examples/examples/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use datafusion::common::pruning::PruningStatistics;
use datafusion::common::{DFSchema, ScalarValue};
use datafusion::execution::context::ExecutionProps;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_optimizer::pruning::{ColumnOrdering, PruningPredicate};
use datafusion::prelude::*;

/// This example shows how to use DataFusion's `PruningPredicate` to prove
Expand Down Expand Up @@ -190,7 +190,12 @@ fn create_pruning_predicate(expr: Expr, schema: &SchemaRef) -> PruningPredicate
let df_schema = DFSchema::try_from(schema.as_ref().clone()).unwrap();
let props = ExecutionProps::new();
let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap();
PruningPredicate::try_new(physical_expr, schema.clone()).unwrap()
PruningPredicate::try_new(
physical_expr,
schema.clone(),
vec![ColumnOrdering::Unknown; schema.fields().len()],
)
.unwrap()
}

fn i32_array<'a>(values: impl Iterator<Item = &'a Option<i32>>) -> ArrayRef {
Expand Down
14 changes: 11 additions & 3 deletions datafusion/core/tests/parquet/page_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,11 +661,15 @@ uint_tests!(64);
// page-2 0 0.0 4.0
// page-3 0 5.0 9.0
async fn prune_f64_lt() {
// TODO: because NaN could be present but not accounted for in the statistics, these
// expressions should not be pruned at present. When IEEE 754 total order is added to
// the Parquet spec this can be revisited.
// See https://github.com/apache/parquet-format/pull/221
test_prune(
Scenario::Float64,
"SELECT * FROM t where f < 1",
Some(0),
Some(5),
Some(0),
11,
5,
)
Expand All @@ -674,7 +678,7 @@ async fn prune_f64_lt() {
Scenario::Float64,
"SELECT * FROM t where -f > -1",
Some(0),
Some(5),
Some(0),
11,
5,
)
Expand All @@ -683,13 +687,17 @@ async fn prune_f64_lt() {

#[tokio::test]
async fn prune_f64_scalar_fun_and_gt() {
// TODO: because NaN could be present but not accounted for in the statistics, this
// expression should not be pruned at present. When IEEE 754 total order is added to
// the Parquet spec this can be revisited.
// See https://github.com/apache/parquet-format/pull/221
// result of sql "SELECT * FROM t where abs(f - 1) <= 0.000001 and f >= 0.1"
// only use "f >= 0" to prune
test_prune(
Scenario::Float64,
"SELECT * FROM t where abs(f - 1) <= 0.000001 and f >= 0.1",
Some(0),
Some(10),
Some(0),
1,
5,
)
Expand Down
21 changes: 15 additions & 6 deletions datafusion/core/tests/parquet/row_group_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,12 +635,16 @@ async fn prune_uint32_eq_large_in_list() {

#[tokio::test]
async fn prune_f64_lt() {
// TODO: because NaN could be present but not accounted for in the statistics, these
// expressions should not be pruned at present. When IEEE 754 total order is added to
// the Parquet spec this can be revisited.
// See https://github.com/apache/parquet-format/pull/221
RowGroupPruningTest::new()
.with_scenario(Scenario::Float64)
.with_query("SELECT * FROM t where f < 1")
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(3))
.with_pruned_by_stats(Some(1))
.with_matched_by_stats(Some(0))
.with_pruned_by_stats(Some(0))
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(0))
.with_expected_rows(11)
Expand All @@ -650,8 +654,8 @@ async fn prune_f64_lt() {
.with_scenario(Scenario::Float64)
.with_query("SELECT * FROM t where -f > -1")
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(3))
.with_pruned_by_stats(Some(1))
.with_matched_by_stats(Some(0))
.with_pruned_by_stats(Some(0))
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(0))
.with_expected_rows(11)
Expand All @@ -661,14 +665,19 @@ async fn prune_f64_lt() {

#[tokio::test]
async fn prune_f64_scalar_fun_and_gt() {
// TODO: because NaN could be present but not accounted for in the statistics, this
// expression should not be pruned at present. When IEEE 754 total order is added to
// the Parquet spec this can be revisited.
// See https://github.com/apache/parquet-format/pull/221

// result of sql "SELECT * FROM t where abs(f - 1) <= 0.000001 and f >= 0.1"
// only use "f >= 0" to prune
RowGroupPruningTest::new()
.with_scenario(Scenario::Float64)
.with_query("SELECT * FROM t where abs(f - 1) <= 0.000001 and f >= 0.1")
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(2))
.with_pruned_by_stats(Some(2))
.with_matched_by_stats(Some(0))
.with_pruned_by_stats(Some(0))
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(0))
.with_expected_rows(1)
Expand Down
46 changes: 43 additions & 3 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ use arrow::datatypes::{SchemaRef, TimeUnit};
use arrow::error::ArrowError;
use datafusion_common::{exec_err, Result};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_optimizer::pruning::PruningPredicate;
use datafusion_physical_optimizer::pruning::{ColumnOrdering, PruningPredicate};
use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder};

use futures::{StreamExt, TryStreamExt};
use log::debug;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::basic::{ColumnOrder, SortOrder};
use parquet::file::metadata::ParquetMetaDataReader;

/// Implements [`FileOpener`] for a parquet file
Expand Down Expand Up @@ -179,10 +180,42 @@ impl FileOpener for ParquetOpener {
}
}

// Map column ordering from physical schema to logical
let ordering: Vec<ColumnOrdering> = if let Some(column_orders) =
reader_metadata.metadata().file_metadata().column_orders()
{
logical_file_schema
.fields()
.iter()
.map(|field| {
match physical_file_schema.index_of(field.name()) {
Ok(idx) => match column_orders[idx] {
ColumnOrder::TYPE_DEFINED_ORDER(sort_order) => {
match sort_order {
SortOrder::SIGNED => ColumnOrdering::Signed,
SortOrder::UNSIGNED => ColumnOrdering::Unsigned,
_ => ColumnOrdering::Undefined,
}
}
/* TODO(ets): for future
ColumnOrder::IEEE_754_TOTAL_ORDER => {
ColumnOrdering::TotalOrder
}*/
ColumnOrder::UNDEFINED => ColumnOrdering::Unknown,
},
_ => ColumnOrdering::Unknown,
}
})
.collect::<Vec<_>>()
} else {
vec![ColumnOrdering::Unknown; logical_file_schema.fields().len()]
};

// Build predicates for this specific file
let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates(
predicate.as_ref(),
&logical_file_schema,
ordering,
&predicate_creation_errors,
);

Expand Down Expand Up @@ -363,9 +396,11 @@ fn create_initial_plan(
pub(crate) fn build_pruning_predicate(
predicate: Arc<dyn PhysicalExpr>,
file_schema: &SchemaRef,
column_orderings: Vec<ColumnOrdering>,
predicate_creation_errors: &Count,
) -> Option<Arc<PruningPredicate>> {
match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) {
match PruningPredicate::try_new(predicate, Arc::clone(file_schema), column_orderings)
{
Ok(pruning_predicate) => {
if !pruning_predicate.always_true() {
return Some(Arc::new(pruning_predicate));
Expand All @@ -385,16 +420,19 @@ pub(crate) fn build_pruning_predicate(
pub(crate) fn build_page_pruning_predicate(
predicate: &Arc<dyn PhysicalExpr>,
file_schema: &SchemaRef,
column_orderings: Vec<ColumnOrdering>,
) -> Arc<PagePruningAccessPlanFilter> {
Arc::new(PagePruningAccessPlanFilter::new(
predicate,
Arc::clone(file_schema),
column_orderings,
))
}

pub(crate) fn build_pruning_predicates(
predicate: Option<&Arc<dyn PhysicalExpr>>,
file_schema: &SchemaRef,
column_orderings: Vec<ColumnOrdering>,
predicate_creation_errors: &Count,
) -> (
Option<Arc<PruningPredicate>>,
Expand All @@ -406,9 +444,11 @@ pub(crate) fn build_pruning_predicates(
let pruning_predicate = build_pruning_predicate(
Arc::clone(predicate),
file_schema,
column_orderings.clone(),
predicate_creation_errors,
);
let page_pruning_predicate = build_page_pruning_predicate(predicate, file_schema);
let page_pruning_predicate =
build_page_pruning_predicate(predicate, file_schema, column_orderings);
(pruning_predicate, Some(page_pruning_predicate))
}

Expand Down
9 changes: 7 additions & 2 deletions datafusion/datasource-parquet/src/page_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use arrow::{
use datafusion_common::pruning::PruningStatistics;
use datafusion_common::ScalarValue;
use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
use datafusion_physical_optimizer::pruning::PruningPredicate;
use datafusion_physical_optimizer::pruning::{ColumnOrdering, PruningPredicate};

use log::{debug, trace};
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
Expand Down Expand Up @@ -119,14 +119,19 @@ pub struct PagePruningAccessPlanFilter {
impl PagePruningAccessPlanFilter {
/// Create a new [`PagePruningAccessPlanFilter`] from a physical
/// expression.
pub fn new(expr: &Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Self {
pub fn new(
expr: &Arc<dyn PhysicalExpr>,
schema: SchemaRef,
column_orderings: Vec<ColumnOrdering>,
) -> Self {
// extract any single column predicates
let predicates = split_conjunction(expr)
.into_iter()
.filter_map(|predicate| {
let pp = match PruningPredicate::try_new(
Arc::clone(predicate),
Arc::clone(&schema),
column_orderings.clone(),
) {
Ok(pp) => pp,
Err(e) => {
Expand Down
Loading