Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion-examples/examples/data_io/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ impl PruningStatistics for ParquetMetadataIndex {
}

/// return the row counts for each file
fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
fn row_counts(&self) -> Option<ArrayRef> {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ends up having a lot of ramifications. IMO the existing API is flawed and we should fix it. I'm happy to do this in an isolated PR, but it does make the rest of this work much simpler.

Some(self.row_counts_ref().clone())
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/query_planning/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl PruningStatistics for MyCatalog {
None
}

fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
fn row_counts(&self) -> Option<ArrayRef> {
// In this example, we know nothing about the number of rows in each file
None
}
Expand Down
71 changes: 34 additions & 37 deletions datafusion/common/src/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,17 @@ pub trait PruningStatistics {
/// [`UInt64Array`]: arrow::array::UInt64Array
fn null_counts(&self, column: &Column) -> Option<ArrayRef>;

/// Return the number of rows for the named column in each container
/// as an [`UInt64Array`].
/// Return the number of rows in each container as an [`UInt64Array`].
///
/// Row counts are container-level (not column-specific) — the value
/// is the same regardless of which column is being considered.
///
/// See [`Self::min_values`] for when to return `None` and null values.
///
/// Note: the returned array must contain [`Self::num_containers`] rows
///
/// [`UInt64Array`]: arrow::array::UInt64Array
fn row_counts(&self, column: &Column) -> Option<ArrayRef>;
fn row_counts(&self) -> Option<ArrayRef>;

/// Returns [`BooleanArray`] where each row represents information known
/// about specific literal `values` in a column.
Expand Down Expand Up @@ -265,7 +267,7 @@ impl PruningStatistics for PartitionPruningStatistics {
None
}

fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
fn row_counts(&self) -> Option<ArrayRef> {
None
}

Expand Down Expand Up @@ -398,11 +400,7 @@ impl PruningStatistics for PrunableStatistics {
}
}

fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
// If the column does not exist in the schema, return None
if self.schema.index_of(column.name()).is_err() {
return None;
}
fn row_counts(&self) -> Option<ArrayRef> {
if self
.statistics
.iter()
Expand Down Expand Up @@ -502,9 +500,9 @@ impl PruningStatistics for CompositePruningStatistics {
None
}

fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
fn row_counts(&self) -> Option<ArrayRef> {
for stats in &self.statistics {
if let Some(array) = stats.row_counts(column) {
if let Some(array) = stats.row_counts() {
return Some(array);
}
}
Expand Down Expand Up @@ -566,9 +564,9 @@ mod tests {

// Partition values don't know anything about nulls or row counts
assert!(partition_stats.null_counts(&column_a).is_none());
assert!(partition_stats.row_counts(&column_a).is_none());
assert!(partition_stats.row_counts().is_none());
assert!(partition_stats.null_counts(&column_b).is_none());
assert!(partition_stats.row_counts(&column_b).is_none());
assert!(partition_stats.row_counts().is_none());

// Min/max values are the same as the partition values
let min_values_a =
Expand Down Expand Up @@ -709,9 +707,9 @@ mod tests {

// Partition values don't know anything about nulls or row counts
assert!(partition_stats.null_counts(&column_a).is_none());
assert!(partition_stats.row_counts(&column_a).is_none());
assert!(partition_stats.row_counts().is_none());
assert!(partition_stats.null_counts(&column_b).is_none());
assert!(partition_stats.row_counts(&column_b).is_none());
assert!(partition_stats.row_counts().is_none());

// Min/max values are all missing
assert!(partition_stats.min_values(&column_a).is_none());
Expand Down Expand Up @@ -814,13 +812,13 @@ mod tests {
assert_eq!(null_counts_b, expected_null_counts_b);

// Row counts are the same as the statistics
let row_counts_a = as_uint64_array(&pruning_stats.row_counts(&column_a).unwrap())
let row_counts_a = as_uint64_array(&pruning_stats.row_counts().unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_row_counts_a = vec![Some(100), Some(200)];
assert_eq!(row_counts_a, expected_row_counts_a);
let row_counts_b = as_uint64_array(&pruning_stats.row_counts(&column_b).unwrap())
let row_counts_b = as_uint64_array(&pruning_stats.row_counts().unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
Expand All @@ -845,20 +843,21 @@ mod tests {
// This is debatable, personally I think `row_count` should not take a `Column` as an argument
// at all since all columns should have the same number of rows.
// But for now we just document the current behavior in this test.
let row_counts_c = as_uint64_array(&pruning_stats.row_counts(&column_c).unwrap())
let row_counts_c = as_uint64_array(&pruning_stats.row_counts().unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_row_counts_c = vec![Some(100), Some(200)];
assert_eq!(row_counts_c, expected_row_counts_c);
assert!(pruning_stats.contained(&column_c, &values).is_none());

// Test with a column that doesn't exist
// Test with a column that doesn't exist — column-specific stats
// return None, but row_counts is container-level and still available
let column_d = Column::new_unqualified("d");
assert!(pruning_stats.min_values(&column_d).is_none());
assert!(pruning_stats.max_values(&column_d).is_none());
assert!(pruning_stats.null_counts(&column_d).is_none());
assert!(pruning_stats.row_counts(&column_d).is_none());
assert!(pruning_stats.row_counts().is_some());
assert!(pruning_stats.contained(&column_d, &values).is_none());
}

Expand Down Expand Up @@ -886,8 +885,8 @@ mod tests {
assert!(pruning_stats.null_counts(&column_b).is_none());

// Row counts are all missing
assert!(pruning_stats.row_counts(&column_a).is_none());
assert!(pruning_stats.row_counts(&column_b).is_none());
assert!(pruning_stats.row_counts().is_none());
assert!(pruning_stats.row_counts().is_none());

// Contained values are all empty
let values = HashSet::from([ScalarValue::from(1i32)]);
Expand Down Expand Up @@ -1027,13 +1026,11 @@ mod tests {
let expected_null_counts_col_x = vec![Some(0), Some(10)];
assert_eq!(null_counts_col_x, expected_null_counts_col_x);

// Test row counts - only available from file statistics
assert!(composite_stats.row_counts(&part_a).is_none());
let row_counts_col_x =
as_uint64_array(&composite_stats.row_counts(&col_x).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
// Test row counts — container-level, available from file statistics
let row_counts_col_x = as_uint64_array(&composite_stats.row_counts().unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_row_counts = vec![Some(100), Some(200)];
assert_eq!(row_counts_col_x, expected_row_counts);

Expand All @@ -1046,12 +1043,13 @@ mod tests {
// File statistics don't implement contained
assert!(composite_stats.contained(&col_x, &values).is_none());

// Non-existent column should return None for everything
// Non-existent column should return None for column-specific stats,
// but row_counts is container-level and still available
let non_existent = Column::new_unqualified("non_existent");
assert!(composite_stats.min_values(&non_existent).is_none());
assert!(composite_stats.max_values(&non_existent).is_none());
assert!(composite_stats.null_counts(&non_existent).is_none());
assert!(composite_stats.row_counts(&non_existent).is_none());
assert!(composite_stats.row_counts().is_some());
assert!(composite_stats.contained(&non_existent, &values).is_none());

// Verify num_containers matches
Expand Down Expand Up @@ -1155,7 +1153,7 @@ mod tests {
let expected_null_counts = vec![Some(0), Some(5)];
assert_eq!(null_counts, expected_null_counts);

let row_counts = as_uint64_array(&composite_stats.row_counts(&col_a).unwrap())
let row_counts = as_uint64_array(&composite_stats.row_counts().unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
Expand Down Expand Up @@ -1195,11 +1193,10 @@ mod tests {
let expected_null_counts = vec![Some(10), Some(20)];
assert_eq!(null_counts, expected_null_counts);

let row_counts =
as_uint64_array(&composite_stats_reversed.row_counts(&col_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let row_counts = as_uint64_array(&composite_stats_reversed.row_counts().unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_row_counts = vec![Some(1000), Some(2000)];
assert_eq!(row_counts, expected_row_counts);
}
Expand Down
46 changes: 32 additions & 14 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use arrow::datatypes::{DataType, Schema};

/// Represents a value with a degree of certainty. `Precision` is used to
/// propagate information the precision of statistical values.
#[derive(Clone, PartialEq, Eq, Default, Copy)]
pub enum Precision<T: Debug + Clone + PartialEq + Eq + PartialOrd> {
#[derive(Clone, Default, Copy)]
pub enum Precision<T: Debug + Clone> {
/// The exact value is known. Used for guaranteeing correctness.
///
/// Comes from definitive sources such as:
Expand Down Expand Up @@ -60,7 +60,7 @@ pub enum Precision<T: Debug + Clone + PartialEq + Eq + PartialOrd> {
Absent,
}

impl<T: Debug + Clone + PartialEq + Eq + PartialOrd> Precision<T> {
impl<T: Debug + Clone> Precision<T> {
/// If we have some value (exact or inexact), it returns that value.
/// Otherwise, it returns `None`.
pub fn get_value(&self) -> Option<&T> {
Expand All @@ -75,7 +75,7 @@ impl<T: Debug + Clone + PartialEq + Eq + PartialOrd> Precision<T> {
pub fn map<U, F>(self, f: F) -> Precision<U>
where
F: Fn(T) -> U,
U: Debug + Clone + PartialEq + Eq + PartialOrd,
U: Debug + Clone,
{
match self {
Precision::Exact(val) => Precision::Exact(f(val)),
Expand All @@ -94,6 +94,16 @@ impl<T: Debug + Clone + PartialEq + Eq + PartialOrd> Precision<T> {
}
}

/// Demotes the precision state from exact to inexact (if present).
pub fn to_inexact(self) -> Self {
match self {
Precision::Exact(value) => Precision::Inexact(value),
_ => self,
}
}
}

impl<T: Debug + Clone + PartialOrd> Precision<T> {
/// Returns the maximum of two (possibly inexact) values, conservatively
/// propagating exactness information. If one of the input values is
/// [`Precision::Absent`], the result is `Absent` too.
Expand Down Expand Up @@ -127,14 +137,6 @@ impl<T: Debug + Clone + PartialEq + Eq + PartialOrd> Precision<T> {
(_, _) => Precision::Absent,
}
}

/// Demotes the precision state from exact to inexact (if present).
pub fn to_inexact(self) -> Self {
match self {
Precision::Exact(value) => Precision::Inexact(value),
_ => self,
}
}
}

impl Precision<usize> {
Expand Down Expand Up @@ -318,7 +320,23 @@ impl Precision<ScalarValue> {
}
}

impl<T: Debug + Clone + PartialEq + Eq + PartialOrd> Debug for Precision<T> {
impl<T: Debug + Clone> PartialEq for Precision<T>
where
T: PartialEq,
{
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Precision::Exact(a), Precision::Exact(b)) => a == b,
(Precision::Inexact(a), Precision::Inexact(b)) => a == b,
(Precision::Absent, Precision::Absent) => true,
_ => false,
}
}
}

impl<T: Debug + Clone + Eq> Eq for Precision<T> {}

impl<T: Debug + Clone> Debug for Precision<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Precision::Exact(inner) => write!(f, "Exact({inner:?})"),
Expand All @@ -328,7 +346,7 @@ impl<T: Debug + Clone + PartialEq + Eq + PartialOrd> Debug for Precision<T> {
}
}

impl<T: Debug + Clone + PartialEq + Eq + PartialOrd> Display for Precision<T> {
impl<T: Debug + Clone> Display for Precision<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Precision::Exact(inner) => write!(f, "Exact({inner:?})"),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/datasource-parquet/src/page_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ impl PruningStatistics for PagesPruningStatistics<'_> {
}
}

fn row_counts(&self, _column: &datafusion_common::Column) -> Option<ArrayRef> {
fn row_counts(&self) -> Option<ArrayRef> {
match self.converter.data_page_row_counts(
self.offset_index,
self.row_group_metadatas,
Expand Down
18 changes: 9 additions & 9 deletions datafusion/datasource-parquet/src/row_group_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use super::{ParquetAccessPlan, ParquetFileMetrics};
use arrow::array::{ArrayRef, BooleanArray};
use arrow::array::{ArrayRef, BooleanArray, UInt64Array};
use arrow::datatypes::Schema;
use datafusion_common::pruning::PruningStatistics;
use datafusion_common::{Column, Result, ScalarValue};
Expand Down Expand Up @@ -536,7 +536,7 @@ impl PruningStatistics for BloomFilterStatistics {
None
}

fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
fn row_counts(&self) -> Option<ArrayRef> {
None
}

Expand Down Expand Up @@ -626,13 +626,13 @@ impl PruningStatistics for RowGroupPruningStatistics<'_> {
.map(|counts| Arc::new(counts) as ArrayRef)
}

fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
// row counts are the same for all columns in a row group
self.statistics_converter(column)
.and_then(|c| Ok(c.row_group_row_counts(self.metadata_iter())?))
.ok()
.flatten()
.map(|counts| Arc::new(counts) as ArrayRef)
fn row_counts(&self) -> Option<ArrayRef> {
// Row counts are container-level — read directly from row group metadata.
let counts: UInt64Array = self
.metadata_iter()
.map(|rg| Some(rg.num_rows() as u64))
.collect();
Some(Arc::new(counts) as ArrayRef)
}

fn contained(
Expand Down
5 changes: 5 additions & 0 deletions datafusion/pruning/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ workspace = true

[dependencies]
arrow = { workspace = true }
async-trait = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-datasource = { workspace = true }
datafusion-expr = { workspace = true, default-features = true }
datafusion-expr-common = { workspace = true, default-features = true }
datafusion-functions-aggregate = { workspace = true, default-features = true }
datafusion-functions-nested = { workspace = true, default-features = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-physical-plan = { workspace = true }
Expand All @@ -30,3 +34,4 @@ datafusion-expr = { workspace = true }
datafusion-functions-nested = { workspace = true }
insta = { workspace = true }
itertools = { workspace = true }
tokio = { workspace = true }
Loading
Loading