Skip to content

Commit 2f43476

Browse files
Initial Extract parquet data page statistics API (#10852)
* feat: enable page statistics * feat: prototype int64 data_page_min * feat: prototype MinInt64DataPageStatsIterator * feat: add make_data_page_stats_iterator macro * feat: add get_data_page_statistics macro * feat: add MaxInt64DataPageStatsIterator * feat: add test_data_page_stats param * chore: add testcase int64_with_nulls * feat: add data page null_counts * fix: clippy * chore: rename column_page_index * feat: add data page row counts * feat: add num_data_pages to iterator * chore: update docs * fix: use colum_offset len in data_page_null_counts * fix: docs * tweak comments * update test helper * Add explicit multi-data page tests to statistics test * Add explicit data page test * remove duplicate test * update coverage --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent ebca681 commit 2f43476

File tree

3 files changed

+657
-140
lines changed

3 files changed

+657
-140
lines changed

datafusion/core/src/datasource/physical_plan/parquet/statistics.rs

Lines changed: 312 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ use arrow_array::{
3333
use arrow_schema::{Field, FieldRef, Schema, TimeUnit};
3434
use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result};
3535
use half::f16;
36-
use parquet::file::metadata::RowGroupMetaData;
36+
use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData};
37+
use parquet::file::page_index::index::Index;
3738
use parquet::file::statistics::Statistics as ParquetStatistics;
3839
use parquet::schema::types::SchemaDescriptor;
3940
use paste::paste;
@@ -517,6 +518,74 @@ macro_rules! get_statistics {
517518
}}}
518519
}
519520

521+
macro_rules! make_data_page_stats_iterator {
522+
($iterator_type: ident, $func: ident, $index_type: path, $stat_value_type: ty) => {
523+
struct $iterator_type<'a, I>
524+
where
525+
I: Iterator<Item = (usize, &'a Index)>,
526+
{
527+
iter: I,
528+
}
529+
530+
impl<'a, I> $iterator_type<'a, I>
531+
where
532+
I: Iterator<Item = (usize, &'a Index)>,
533+
{
534+
fn new(iter: I) -> Self {
535+
Self { iter }
536+
}
537+
}
538+
539+
impl<'a, I> Iterator for $iterator_type<'a, I>
540+
where
541+
I: Iterator<Item = (usize, &'a Index)>,
542+
{
543+
type Item = Vec<Option<$stat_value_type>>;
544+
545+
fn next(&mut self) -> Option<Self::Item> {
546+
let next = self.iter.next();
547+
match next {
548+
Some((len, index)) => match index {
549+
$index_type(native_index) => Some(
550+
native_index
551+
.indexes
552+
.iter()
553+
.map(|x| x.$func)
554+
.collect::<Vec<_>>(),
555+
),
556+
// No matching `Index` found;
557+
// thus no statistics that can be extracted.
558+
// We return vec![None; len] to effectively
559+
// create an arrow null-array with the length
560+
// corresponding to the number of entries in
561+
// `ParquetOffsetIndex` per row group per column.
562+
_ => Some(vec![None; len]),
563+
},
564+
_ => None,
565+
}
566+
}
567+
568+
fn size_hint(&self) -> (usize, Option<usize>) {
569+
self.iter.size_hint()
570+
}
571+
}
572+
};
573+
}
574+
575+
make_data_page_stats_iterator!(MinInt64DataPageStatsIterator, min, Index::INT64, i64);
576+
make_data_page_stats_iterator!(MaxInt64DataPageStatsIterator, max, Index::INT64, i64);
577+
578+
macro_rules! get_data_page_statistics {
579+
($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => {
580+
paste! {
581+
match $data_type {
582+
Some(DataType::Int64) => Ok(Arc::new(Int64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))),
583+
_ => unimplemented!()
584+
}
585+
}
586+
}
587+
}
588+
520589
/// Lookups up the parquet column by name
521590
///
522591
/// Returns the parquet column index and the corresponding arrow field
@@ -563,6 +632,51 @@ fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
563632
get_statistics!(Max, data_type, iterator)
564633
}
565634

635+
/// Extracts the min statistics from an iterator
636+
/// of parquet page [`Index`]'es to an [`ArrayRef`]
637+
pub(crate) fn min_page_statistics<'a, I>(
638+
data_type: Option<&DataType>,
639+
iterator: I,
640+
) -> Result<ArrayRef>
641+
where
642+
I: Iterator<Item = (usize, &'a Index)>,
643+
{
644+
get_data_page_statistics!(Min, data_type, iterator)
645+
}
646+
647+
/// Extracts the max statistics from an iterator
648+
/// of parquet page [`Index`]'es to an [`ArrayRef`]
649+
pub(crate) fn max_page_statistics<'a, I>(
650+
data_type: Option<&DataType>,
651+
iterator: I,
652+
) -> Result<ArrayRef>
653+
where
654+
I: Iterator<Item = (usize, &'a Index)>,
655+
{
656+
get_data_page_statistics!(Max, data_type, iterator)
657+
}
658+
659+
/// Extracts the null count statistics from an iterator
660+
/// of parquet page [`Index`]'es to an [`ArrayRef`]
661+
///
662+
/// The returned Array is an [`UInt64Array`]
663+
pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result<ArrayRef>
664+
where
665+
I: Iterator<Item = (usize, &'a Index)>,
666+
{
667+
let iter = iterator.flat_map(|(len, index)| match index {
668+
Index::NONE => vec![None; len],
669+
Index::INT64(native_index) => native_index
670+
.indexes
671+
.iter()
672+
.map(|x| x.null_count.map(|x| x as u64))
673+
.collect::<Vec<_>>(),
674+
_ => unimplemented!(),
675+
});
676+
677+
Ok(Arc::new(UInt64Array::from_iter(iter)))
678+
}
679+
566680
/// Extracts Parquet statistics as Arrow arrays
567681
///
568682
/// This is used to convert Parquet statistics to Arrow arrays, with proper type
@@ -771,10 +885,205 @@ impl<'a> StatisticsConverter<'a> {
771885
Ok(Arc::new(UInt64Array::from_iter(null_counts)))
772886
}
773887

888+
/// Extract the minimum values from Data Page statistics.
889+
///
890+
/// In Parquet files, in addition to the Column Chunk level statistics
891+
/// (stored for each column for each row group) there are also
892+
/// optional statistics stored for each data page, as part of
893+
/// the [`ParquetColumnIndex`].
894+
///
895+
/// Since a single Column Chunk is stored as one or more pages,
896+
/// page level statistics can prune at a finer granularity.
897+
///
898+
/// However since they are stored in a separate metadata
899+
/// structure ([`Index`]) there is different code to extract them as
900+
/// compared to arrow statistics.
901+
///
902+
/// # Parameters:
903+
///
904+
/// * `column_page_index`: The parquet column page indices, read from
905+
/// `ParquetMetaData` column_index
906+
///
907+
/// * `column_offset_index`: The parquet column offset indices, read from
908+
/// `ParquetMetaData` offset_index
909+
///
910+
/// * `row_group_indices`: The indices of the row groups, that are used to
911+
/// extract the column page index and offset index on a per row group
912+
/// per column basis.
913+
///
914+
/// # Return Value
915+
///
916+
/// The returned array contains 1 value for each `NativeIndex`
917+
/// in the underlying `Index`es, in the same order as they appear
918+
/// in `metadatas`.
919+
///
920+
/// For example, if there are two `Index`es in `metadatas`:
921+
/// 1. the first having `3` `PageIndex` entries
922+
/// 2. the second having `2` `PageIndex` entries
923+
///
924+
/// The returned array would have 5 rows.
925+
///
926+
/// Each value is either:
927+
/// * the minimum value for the page
928+
/// * a null value, if the statistics can not be extracted
929+
///
930+
/// Note that a null value does NOT mean the min value was actually
931+
/// `null` it means it the requested statistic is unknown
932+
///
933+
/// # Errors
934+
///
935+
/// Reasons for not being able to extract the statistics include:
936+
/// * the column is not present in the parquet file
937+
/// * statistics for the pages are not present in the row group
938+
/// * the stored statistic value can not be converted to the requested type
939+
pub fn data_page_mins<I>(
940+
&self,
941+
column_page_index: &ParquetColumnIndex,
942+
column_offset_index: &ParquetOffsetIndex,
943+
row_group_indices: I,
944+
) -> Result<ArrayRef>
945+
where
946+
I: IntoIterator<Item = &'a usize>,
947+
{
948+
let data_type = self.arrow_field.data_type();
949+
950+
let Some(parquet_index) = self.parquet_index else {
951+
return Ok(self.make_null_array(data_type, row_group_indices));
952+
};
953+
954+
let iter = row_group_indices.into_iter().map(|rg_index| {
955+
let column_page_index_per_row_group_per_column =
956+
&column_page_index[*rg_index][parquet_index];
957+
let num_data_pages = &column_offset_index[*rg_index][parquet_index].len();
958+
959+
(*num_data_pages, column_page_index_per_row_group_per_column)
960+
});
961+
962+
min_page_statistics(Some(data_type), iter)
963+
}
964+
965+
/// Extract the maximum values from Data Page statistics.
966+
///
967+
/// See docs on [`Self::data_page_mins`] for details.
968+
pub fn data_page_maxes<I>(
969+
&self,
970+
column_page_index: &ParquetColumnIndex,
971+
column_offset_index: &ParquetOffsetIndex,
972+
row_group_indices: I,
973+
) -> Result<ArrayRef>
974+
where
975+
I: IntoIterator<Item = &'a usize>,
976+
{
977+
let data_type = self.arrow_field.data_type();
978+
979+
let Some(parquet_index) = self.parquet_index else {
980+
return Ok(self.make_null_array(data_type, row_group_indices));
981+
};
982+
983+
let iter = row_group_indices.into_iter().map(|rg_index| {
984+
let column_page_index_per_row_group_per_column =
985+
&column_page_index[*rg_index][parquet_index];
986+
let num_data_pages = &column_offset_index[*rg_index][parquet_index].len();
987+
988+
(*num_data_pages, column_page_index_per_row_group_per_column)
989+
});
990+
991+
max_page_statistics(Some(data_type), iter)
992+
}
993+
994+
/// Extract the null counts from Data Page statistics.
995+
///
996+
/// The returned Array is an [`UInt64Array`]
997+
///
998+
/// See docs on [`Self::data_page_mins`] for details.
999+
pub fn data_page_null_counts<I>(
1000+
&self,
1001+
column_page_index: &ParquetColumnIndex,
1002+
column_offset_index: &ParquetOffsetIndex,
1003+
row_group_indices: I,
1004+
) -> Result<ArrayRef>
1005+
where
1006+
I: IntoIterator<Item = &'a usize>,
1007+
{
1008+
let data_type = self.arrow_field.data_type();
1009+
1010+
let Some(parquet_index) = self.parquet_index else {
1011+
return Ok(self.make_null_array(data_type, row_group_indices));
1012+
};
1013+
1014+
let iter = row_group_indices.into_iter().map(|rg_index| {
1015+
let column_page_index_per_row_group_per_column =
1016+
&column_page_index[*rg_index][parquet_index];
1017+
let num_data_pages = &column_offset_index[*rg_index][parquet_index].len();
1018+
1019+
(*num_data_pages, column_page_index_per_row_group_per_column)
1020+
});
1021+
null_counts_page_statistics(iter)
1022+
}
1023+
1024+
/// Returns an [`ArrayRef`] with row counts for each row group.
1025+
///
1026+
/// This function iterates over the given row group indexes and computes
1027+
/// the row count for each page in the specified column.
1028+
///
1029+
/// # Parameters:
1030+
///
1031+
/// * `column_offset_index`: The parquet column offset indices, read from
1032+
/// `ParquetMetaData` offset_index
1033+
///
1034+
/// * `row_group_metadatas`: The metadata slice of the row groups, read
1035+
/// from `ParquetMetaData` row_groups
1036+
///
1037+
/// * `row_group_indices`: The indices of the row groups, that are used to
1038+
/// extract the column offset index on a per row group per column basis.
1039+
///
1040+
/// See docs on [`Self::data_page_mins`] for details.
1041+
pub fn data_page_row_counts<I>(
1042+
&self,
1043+
column_offset_index: &ParquetOffsetIndex,
1044+
row_group_metadatas: &[RowGroupMetaData],
1045+
row_group_indices: I,
1046+
) -> Result<ArrayRef>
1047+
where
1048+
I: IntoIterator<Item = &'a usize>,
1049+
{
1050+
let data_type = self.arrow_field.data_type();
1051+
1052+
let Some(parquet_index) = self.parquet_index else {
1053+
return Ok(self.make_null_array(data_type, row_group_indices));
1054+
};
1055+
1056+
// `offset_index[row_group_number][column_number][page_number]` holds
1057+
// the [`PageLocation`] corresponding to page `page_number` of column
1058+
// `column_number`of row group `row_group_number`.
1059+
let mut row_count_total = Vec::new();
1060+
for rg_idx in row_group_indices {
1061+
let page_locations = &column_offset_index[*rg_idx][parquet_index];
1062+
1063+
let row_count_per_page = page_locations.windows(2).map(|loc| {
1064+
Some(loc[1].first_row_index as u64 - loc[0].first_row_index as u64)
1065+
});
1066+
1067+
let num_rows_in_row_group = &row_group_metadatas[*rg_idx].num_rows();
1068+
1069+
// append the last page row count
1070+
let row_count_per_page = row_count_per_page
1071+
.chain(std::iter::once(Some(
1072+
*num_rows_in_row_group as u64
1073+
- page_locations.last().unwrap().first_row_index as u64,
1074+
)))
1075+
.collect::<Vec<_>>();
1076+
1077+
row_count_total.extend(row_count_per_page);
1078+
}
1079+
1080+
Ok(Arc::new(UInt64Array::from_iter(row_count_total)))
1081+
}
1082+
7741083
/// Returns a null array of data_type with one element per row group
775-
fn make_null_array<I>(&self, data_type: &DataType, metadatas: I) -> ArrayRef
1084+
fn make_null_array<I, A>(&self, data_type: &DataType, metadatas: I) -> ArrayRef
7761085
where
777-
I: IntoIterator<Item = &'a RowGroupMetaData>,
1086+
I: IntoIterator<Item = A>,
7781087
{
7791088
// column was in the arrow schema but not in the parquet schema, so return a null array
7801089
let num_row_groups = metadatas.into_iter().count();

0 commit comments

Comments
 (0)