Skip to content

Truncate Parquet page data page statistics #7555

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1324,9 +1324,12 @@ mod tests {
use super::*;

use std::fs::File;
use std::io::Seek;

use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
use crate::arrow::ARROW_SCHEMA_META_KEY;
use crate::format::PageHeader;
use crate::thrift::TCompactSliceInputProtocol;
use arrow::datatypes::ToByteSlice;
use arrow::datatypes::{DataType, Schema};
use arrow::error::Result as ArrowResult;
Expand Down Expand Up @@ -3766,4 +3769,68 @@ mod tests {
.unwrap();
assert_eq!(batches.len(), 0);
}

#[test]
fn test_page_stats_truncation() {
let string_field = Field::new("a", DataType::Utf8, false);
let binary_field = Field::new("b", DataType::Binary, false);
let schema = Schema::new(vec![string_field, binary_field]);

let raw_string_values = vec!["Blart Versenwald III"];
let raw_binary_values = [b"Blart Versenwald III".to_vec()];
let raw_binary_value_refs = raw_binary_values
.iter()
.map(|x| x.as_slice())
.collect::<Vec<_>>();

let string_values = StringArray::from(raw_string_values.clone());
let binary_values = BinaryArray::from(raw_binary_value_refs);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(string_values), Arc::new(binary_values)],
)
.unwrap();

let props = WriterProperties::builder()
.set_statistics_truncate_length(Some(2))
.set_dictionary_enabled(false)
.set_encoding(Encoding::PLAIN)
.set_compression(crate::basic::Compression::UNCOMPRESSED)
.build();

let mut file = roundtrip_opts(&batch, props);

// read file and decode page headers
// Note: use the thrift API as there is no Rust API to access the statistics in the page headers
let mut buf = vec![];
file.seek(std::io::SeekFrom::Start(0)).unwrap();
let read = file.read_to_end(&mut buf).unwrap();
assert!(read > 0);

// decode first page header
let first_page = &buf[4..];
let mut prot = TCompactSliceInputProtocol::new(first_page);
let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
let stats = hdr.data_page_header.unwrap().statistics;
assert!(stats.is_some());
let stats = stats.unwrap();
// check that min/max were properly truncated
assert!(!stats.is_max_value_exact.unwrap());
assert!(!stats.is_min_value_exact.unwrap());
assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());

// check second page now
let second_page = &prot.as_slice()[hdr.compressed_page_size as usize..];
let mut prot = TCompactSliceInputProtocol::new(second_page);
let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
let stats = hdr.data_page_header.unwrap().statistics;
assert!(stats.is_some());
let stats = stats.unwrap();
// check that min/max were properly truncated
assert!(!stats.is_max_value_exact.unwrap());
assert!(!stats.is_min_value_exact.unwrap());
assert_eq!(stats.max_value.unwrap(), "Bm".as_bytes());
assert_eq!(stats.min_value.unwrap(), "Bl".as_bytes());
}
}
102 changes: 55 additions & 47 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,59 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
.unwrap_or_else(|| (data.to_vec(), false))
}

/// Truncate the min and max values that will be written to a data page
/// header or column chunk Statistics
fn truncate_statistics(&self, statistics: Statistics) -> Statistics {
let backwards_compatible_min_max = self.descr.sort_order().is_signed();
match statistics {
Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
let (min, did_truncate_min) = self.truncate_min_value(
self.props.statistics_truncate_length(),
stats.min_bytes_opt().unwrap(),
);
let (max, did_truncate_max) = self.truncate_max_value(
self.props.statistics_truncate_length(),
stats.max_bytes_opt().unwrap(),
);
Statistics::ByteArray(
ValueStatistics::new(
Some(min.into()),
Some(max.into()),
stats.distinct_count(),
stats.null_count_opt(),
backwards_compatible_min_max,
)
.with_max_is_exact(!did_truncate_max)
.with_min_is_exact(!did_truncate_min),
)
}
Statistics::FixedLenByteArray(stats)
if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
{
let (min, did_truncate_min) = self.truncate_min_value(
self.props.statistics_truncate_length(),
stats.min_bytes_opt().unwrap(),
);
let (max, did_truncate_max) = self.truncate_max_value(
self.props.statistics_truncate_length(),
stats.max_bytes_opt().unwrap(),
);
Statistics::FixedLenByteArray(
ValueStatistics::new(
Some(min.into()),
Some(max.into()),
stats.distinct_count(),
stats.null_count_opt(),
backwards_compatible_min_max,
)
.with_max_is_exact(!did_truncate_max)
.with_min_is_exact(!did_truncate_min),
)
}
stats => stats,
}
}

/// Adds data page.
/// Data page is either buffered in case of dictionary encoding or written directly.
fn add_data_page(&mut self) -> Result<()> {
Expand Down Expand Up @@ -992,6 +1045,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
.update_variable_length_bytes(values_data.variable_length_bytes);

let page_statistics = page_statistics.map(Statistics::from);
let page_statistics = page_statistics.map(|stats| self.truncate_statistics(stats));

let compressed_page = match self.props.writer_version() {
WriterVersion::PARQUET_1_0 => {
Expand Down Expand Up @@ -1147,53 +1201,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
.with_backwards_compatible_min_max(backwards_compatible_min_max)
.into();

let statistics = match statistics {
Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
let (min, did_truncate_min) = self.truncate_min_value(
self.props.statistics_truncate_length(),
stats.min_bytes_opt().unwrap(),
);
let (max, did_truncate_max) = self.truncate_max_value(
self.props.statistics_truncate_length(),
stats.max_bytes_opt().unwrap(),
);
Statistics::ByteArray(
ValueStatistics::new(
Some(min.into()),
Some(max.into()),
stats.distinct_count(),
stats.null_count_opt(),
backwards_compatible_min_max,
)
.with_max_is_exact(!did_truncate_max)
.with_min_is_exact(!did_truncate_min),
)
}
Statistics::FixedLenByteArray(stats)
if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
{
let (min, did_truncate_min) = self.truncate_min_value(
self.props.statistics_truncate_length(),
stats.min_bytes_opt().unwrap(),
);
let (max, did_truncate_max) = self.truncate_max_value(
self.props.statistics_truncate_length(),
stats.max_bytes_opt().unwrap(),
);
Statistics::FixedLenByteArray(
ValueStatistics::new(
Some(min.into()),
Some(max.into()),
stats.distinct_count(),
stats.null_count_opt(),
backwards_compatible_min_max,
)
.with_max_is_exact(!did_truncate_max)
.with_min_is_exact(!did_truncate_min),
)
}
stats => stats,
};
let statistics = self.truncate_statistics(statistics);

builder = builder
.set_statistics(statistics)
Expand Down
16 changes: 12 additions & 4 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,13 @@ impl WriterProperties {
self.column_index_truncate_length
}

/// Returns the maximum length of truncated min/max values in statistics.
/// Returns the maximum length of truncated min/max values in [`Statistics`].
///
/// `None` if truncation is disabled, must be greater than 0 otherwise.
///
/// For more details see [`WriterPropertiesBuilder::set_statistics_truncate_length`]
///
/// [`Statistics`]: crate::file::statistics::Statistics
pub fn statistics_truncate_length(&self) -> Option<usize> {
self.statistics_truncate_length
}
Expand Down Expand Up @@ -646,16 +648,22 @@ impl WriterPropertiesBuilder {
self
}

/// Sets the max length of min/max value fields in row group level
/// Sets the max length of min/max value fields in row group and data page header
/// [`Statistics`] (defaults to `None` (no limit) via [`DEFAULT_STATISTICS_TRUNCATE_LENGTH`]).
///
/// # Notes
/// Row group level [`Statistics`] are written when [`Self::set_statistics_enabled`] is
/// set to [`EnabledStatistics::Chunk`] or [`EnabledStatistics::Page`].
/// Row group [`Statistics`] are written when [`Self::set_statistics_enabled`] is
/// set to [`EnabledStatistics::Chunk`] or [`EnabledStatistics::Page`]. Data page header
/// [`Statistics`] are written when [`Self::set_statistics_enabled`] is set to
/// [`EnabledStatistics::Page`].
///
/// * If `Some`, must be greater than 0, otherwise will panic
/// * If `None`, there's no effective limit.
///
/// # See also
/// Truncation of Page Index statistics is controlled separately via
/// [`WriterPropertiesBuilder::set_column_index_truncate_length`]
///
/// [`Statistics`]: crate::file::statistics::Statistics
pub fn set_statistics_truncate_length(mut self, max_length: Option<usize>) -> Self {
if let Some(value) = max_length {
Expand Down
Loading