Skip to content
Open
34 changes: 33 additions & 1 deletion parquet/benches/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::Arc;
use parquet::basic::{Encoding, PageType, Type as PhysicalType};
use parquet::file::metadata::{
ColumnChunkMetaData, FileMetaData, PageEncodingStats, ParquetMetaData, ParquetMetaDataOptions,
ParquetMetaDataReader, ParquetMetaDataWriter, RowGroupMetaData,
ParquetMetaDataReader, ParquetMetaDataWriter, ParquetStatisticsPolicy, RowGroupMetaData,
};
use parquet::file::statistics::Statistics;
use parquet::file::writer::TrackedWrite;
Expand Down Expand Up @@ -173,6 +173,23 @@ fn criterion_benchmark(c: &mut Criterion) {
})
});

let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(true);
c.bench_function("decode metadata with stats mask", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_metadata_with_options(&meta_data, Some(&options))
.unwrap();
})
});

let options =
ParquetMetaDataOptions::new().with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll);
c.bench_function("decode metadata with skip PES", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_metadata_with_options(&meta_data, Some(&options))
.unwrap();
})
});

let buf: Bytes = black_box(encoded_meta()).into();
c.bench_function("decode parquet metadata (wide)", |b| {
b.iter(|| {
Expand All @@ -187,6 +204,21 @@ fn criterion_benchmark(c: &mut Criterion) {
ParquetMetaDataReader::decode_metadata_with_options(&buf, Some(&options)).unwrap();
})
});

let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(true);
c.bench_function("decode metadata (wide) with stats mask", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_metadata_with_options(&buf, Some(&options)).unwrap();
})
});

let options =
ParquetMetaDataOptions::new().with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll);
c.bench_function("decode metadata (wide) with skip PES", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_metadata_with_options(&buf, Some(&options)).unwrap();
})
});
}

criterion_group!(benches, criterion_benchmark);
Expand Down
90 changes: 88 additions & 2 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::encryption::decrypt::FileDecryptionProperties;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{
PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
RowGroupMetaData,
ParquetStatisticsPolicy, RowGroupMetaData,
};
use crate::file::reader::{ChunkReader, SerializedPageReader};
use crate::schema::types::SchemaDescriptor;
Expand Down Expand Up @@ -557,6 +557,29 @@ impl ArrowReaderOptions {
self
}

/// Set whether to convert the [`encoding_stats`] in the Parquet `ColumnMetaData` to a bitmask.
///
/// See [`ColumnChunkMetaData::page_encoding_stats_mask`] for an explanation of why this
/// might be desirable.
///
/// [`ColumnChunkMetaData::page_encoding_stats_mask`]:
/// crate::file::metadata::ColumnChunkMetaData::page_encoding_stats_mask
/// [`encoding_stats`]:
/// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
self.metadata_options.set_encoding_stats_as_mask(val);
self
}

/// Sets the decoding policy for [`encoding_stats`] in the Parquet `ColumnMetaData`.
///
/// [`encoding_stats`]:
/// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
self.metadata_options.set_encoding_stats_policy(policy);
self
}

/// Provide the file decryption properties to use when reading encrypted parquet files.
///
/// If encryption is enabled and the file is encrypted, the `file_decryption_properties` must be provided.
Expand Down Expand Up @@ -1420,7 +1443,7 @@ pub(crate) mod tests {
FloatType, Int32Type, Int64Type, Int96, Int96Type,
};
use crate::errors::Result;
use crate::file::metadata::ParquetMetaData;
use crate::file::metadata::{ParquetMetaData, ParquetStatisticsPolicy};
use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
use crate::file::writer::SerializedFileWriter;
use crate::schema::parser::parse_message_type;
Expand Down Expand Up @@ -1474,6 +1497,69 @@ pub(crate) mod tests {
assert_eq!(expected.as_ref(), builder.metadata.as_ref());
}

#[test]
fn test_page_encoding_stats_mask() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_tiny_pages.parquet");
let file = File::open(path).unwrap();

let arrow_options = ArrowReaderOptions::new().with_encoding_stats_as_mask(true);
let builder =
ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();

let row_group_metadata = builder.metadata.row_group(0);

// test page encoding stats
let page_encoding_stats = row_group_metadata
.column(0)
.page_encoding_stats_mask()
.unwrap();
assert!(page_encoding_stats.is_only(Encoding::PLAIN));
let page_encoding_stats = row_group_metadata
.column(2)
.page_encoding_stats_mask()
.unwrap();
assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
}

#[test]
fn test_page_encoding_stats_skipped() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_tiny_pages.parquet");
let file = File::open(path).unwrap();

// test skipping all
let arrow_options =
ArrowReaderOptions::new().with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll);
let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
file.try_clone().unwrap(),
arrow_options,
)
.unwrap();

let row_group_metadata = builder.metadata.row_group(0);
for column in row_group_metadata.columns() {
assert!(column.page_encoding_stats().is_none());
assert!(column.page_encoding_stats_mask().is_none());
}

// test skipping all but one column and converting to mask
let arrow_options = ArrowReaderOptions::new()
.with_encoding_stats_as_mask(true)
.with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]));
let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
file.try_clone().unwrap(),
arrow_options,
)
.unwrap();

let row_group_metadata = builder.metadata.row_group(0);
for (idx, column) in row_group_metadata.columns().iter().enumerate() {
assert!(column.page_encoding_stats().is_none());
assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
}
}

#[test]
fn test_arrow_reader_single_column() {
let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
Expand Down
15 changes: 15 additions & 0 deletions parquet/src/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,11 @@ impl EncodingMask {
self.0 & (1 << (val as i32)) != 0
}

/// Test if this mask has only the bit for the given [`Encoding`] set.
pub fn is_only(&self, val: Encoding) -> bool {
self.0 == (1 << (val as i32))
}

/// Test if all [`Encoding`]s in a given set are present in this mask.
pub fn all_set<'a>(&self, mut encodings: impl Iterator<Item = &'a Encoding>) -> bool {
encodings.all(|&e| self.is_set(e))
Expand Down Expand Up @@ -2510,4 +2515,14 @@ mod tests {
"Parquet error: Attempt to create invalid mask: 0x2"
);
}

#[test]
fn test_encoding_mask_is_only() {
let mask = EncodingMask::new_from_encodings([Encoding::PLAIN].iter());
assert!(mask.is_only(Encoding::PLAIN));

let mask =
EncodingMask::new_from_encodings([Encoding::PLAIN, Encoding::PLAIN_DICTIONARY].iter());
assert!(!mask.is_only(Encoding::PLAIN));
}
}
12 changes: 11 additions & 1 deletion parquet/src/file/metadata/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
use crate::basic::{BoundaryOrder, ColumnOrder, Compression, Encoding, PageType};
use crate::data_type::private::ParquetValueType;
use crate::file::metadata::{
ColumnChunkMetaData, FileMetaData, KeyValue, PageEncodingStats, RowGroupMetaData, SortingColumn,
ColumnChunkMetaData, FileMetaData, KeyValue, PageEncodingStats, ParquetPageEncodingStats,
RowGroupMetaData, SortingColumn,
};
use crate::file::page_index::column_index::{
ByteArrayColumnIndex, ColumnIndex, ColumnIndexMetaData, PrimitiveColumnIndex,
Expand Down Expand Up @@ -185,6 +186,15 @@ impl HeapSize for Encoding {
}
}

impl HeapSize for ParquetPageEncodingStats {
fn heap_size(&self) -> usize {
match self {
Self::Full(v) => v.heap_size(),
Self::Mask(_) => 0,
}
}
}

impl HeapSize for PageEncodingStats {
fn heap_size(&self) -> usize {
self.page_type.heap_size() + self.encoding.heap_size()
Expand Down
69 changes: 63 additions & 6 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ use crate::{
};

pub use footer_tail::FooterTail;
pub use options::ParquetMetaDataOptions;
pub use options::{ParquetMetaDataOptions, ParquetStatisticsPolicy};
pub use push_decoder::ParquetMetaDataPushDecoder;
pub use reader::{PageIndexPolicy, ParquetMetaDataReader};
use std::io::Write;
Expand Down Expand Up @@ -470,6 +470,16 @@ pub struct PageEncodingStats {
}
);

/// Internal representation of the page encoding stats in the [`ColumnChunkMetaData`].
/// This is not publicly exposed, with different getters defined for each variant.
#[derive(Debug, Clone, PartialEq)]
enum ParquetPageEncodingStats {
/// The full array of stats as defined in the Parquet spec.
Full(Vec<PageEncodingStats>),
/// A condensed version of only page encodings seen.
Mask(EncodingMask),
}

/// Reference counted pointer for [`FileMetaData`].
pub type FileMetaDataPtr = Arc<FileMetaData>;

Expand Down Expand Up @@ -812,7 +822,7 @@ pub struct ColumnChunkMetaData {
dictionary_page_offset: Option<i64>,
statistics: Option<Statistics>,
geo_statistics: Option<Box<geo_statistics::GeospatialStatistics>>,
encoding_stats: Option<Vec<PageEncodingStats>>,
encoding_stats: Option<ParquetPageEncodingStats>,
bloom_filter_offset: Option<i64>,
bloom_filter_length: Option<i32>,
offset_index_offset: Option<i64>,
Expand Down Expand Up @@ -1050,10 +1060,47 @@ impl ColumnChunkMetaData {
self.geo_statistics.as_deref()
}

/// Returns the offset for the page encoding stats,
/// or `None` if no page encoding stats are available.
/// Returns the page encoding statistics, or `None` if no page encoding statistics
/// are available (or they were converted to a mask).
pub fn page_encoding_stats(&self) -> Option<&Vec<PageEncodingStats>> {
self.encoding_stats.as_ref()
match self.encoding_stats.as_ref() {
Some(ParquetPageEncodingStats::Full(stats)) => Some(stats),
_ => None,
}
}

/// Returns the page encoding statistics reduced to a bitmask, or `None` if statistics are
/// not available (or they were left in their original form).
///
/// The [`PageEncodingStats`] struct was added to the Parquet specification specifically to
/// enable fast determination of whether all pages in a column chunk are dictionary encoded
/// (see <https://github.com/apache/parquet-format/pull/16>).
/// Decoding the full page encoding statistics, however, can be very costly, and is not
/// necessary to support the aforementioned use case. As an alternative, this crate can
/// instead distill the list of `PageEncodingStats` down to a bitmask of just the encodings
/// used for data pages
/// (see [`ParquetMetaDataOptions::set_encoding_stats_as_mask`]).
/// To test for an all-dictionary-encoded chunk one could use this bitmask in the following way:
///
/// ```rust
/// use parquet::basic::Encoding;
/// use parquet::file::metadata::ColumnChunkMetaData;
/// // test if all data pages in the column chunk are dictionary encoded
/// fn is_all_dictionary_encoded(col_meta: &ColumnChunkMetaData) -> bool {
/// // check that dictionary encoding was used
/// col_meta.dictionary_page_offset().is_some()
/// && col_meta.page_encoding_stats_mask().is_some_and(|mask| {
/// // mask should only have one bit set, either for PLAIN_DICTIONARY or
/// // RLE_DICTIONARY
/// mask.is_only(Encoding::PLAIN_DICTIONARY) || mask.is_only(Encoding::RLE_DICTIONARY)
/// })
/// }
/// ```
pub fn page_encoding_stats_mask(&self) -> Option<&EncodingMask> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this should be data_page_encoding_stats_mask (or just data_page_encoding_stats) to make it clear it only has the stats for data pages.

match self.encoding_stats.as_ref() {
Some(ParquetPageEncodingStats::Mask(stats)) => Some(stats),
_ => None,
}
}

/// Returns the offset for the bloom filter.
Expand Down Expand Up @@ -1273,8 +1320,18 @@ impl ColumnChunkMetaDataBuilder {
}

/// Sets page encoding stats for this column chunk.
///
/// This will overwrite any existing stats, either `Vec` based or bitmask.
pub fn set_page_encoding_stats(mut self, value: Vec<PageEncodingStats>) -> Self {
self.0.encoding_stats = Some(value);
self.0.encoding_stats = Some(ParquetPageEncodingStats::Full(value));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be nice here in the comments to call out that setting the stats will override and mask that was set, and vice versa.

self
}

/// Sets page encoding stats mask for this column chunk.
///
/// This will overwrite any existing stats, either `Vec` based or bitmask.
pub fn set_page_encoding_stats_mask(mut self, value: EncodingMask) -> Self {
self.0.encoding_stats = Some(ParquetPageEncodingStats::Mask(value));
self
}

Expand Down
Loading
Loading