Skip to content
182 changes: 182 additions & 0 deletions parquet/src/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::str::FromStr;
use std::{fmt, str};

pub use crate::compression::{BrotliLevel, GzipLevel, ZstdLevel};
use crate::file::metadata::HeapSize;
use crate::parquet_thrift::{
ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol, ThriftCompactOutputProtocol,
WriteThrift, WriteThriftField,
Expand Down Expand Up @@ -724,6 +725,132 @@ impl FromStr for Encoding {
}
}

/// A bitmask representing the [`Encoding`]s employed while encoding a Parquet column chunk.
///
/// The Parquet [`ColumnMetaData`] struct contains an array that indicates what encodings were
/// used when writing that column chunk. For memory and performance reasons, this crate reduces
/// that array to bitmask, where each bit position represents a different [`Encoding`]. This
/// struct contains that bitmask, and provides methods to interact with the data.
///
/// # Example
/// ```no_run
/// # use parquet::file::metadata::ParquetMetaDataReader;
/// # use parquet::basic::Encoding;
/// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
/// // read parquet metadata from a file
/// let file = open_parquet_file("some_path.parquet");
/// let mut reader = ParquetMetaDataReader::new();
/// reader.try_parse(&file).unwrap();
/// let metadata = reader.finish().unwrap();
///
/// // find the encodings used by the first column chunk in the first row group
/// let col_meta = metadata.row_group(0).column(0);
/// let encodings = col_meta.encodings_mask();
///
/// // check to see if a particular encoding was used
/// let used_rle = encodings.is_set(Encoding::RLE);
///
/// // check to see if all of a set of encodings were used
/// let used_all = encodings.all_set([Encoding::RLE, Encoding::PLAIN].iter());
///
/// // convert mask to a Vec<Encoding>
/// let encodings_vec = encodings.encodings().collect::<Vec<_>>();
/// ```
///
/// [`ColumnMetaData`]: https://github.com/apache/parquet-format/blob/9fd57b59e0ce1a82a69237dcf8977d3e72a2965d/src/main/thrift/parquet.thrift#L875
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct EncodingMask(i32);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️


impl EncodingMask {
/// Highest valued discriminant in the [`Encoding`] enum
const MAX_ENCODING: i32 = Encoding::BYTE_STREAM_SPLIT as i32;
/// A mask consisting of unused bit positions, used for validation. This includes the never
/// used GROUP_VAR_INT encoding value of `1`.
const ALLOWED_MASK: u32 =
!(1u32 << (EncodingMask::MAX_ENCODING as u32 + 1)).wrapping_sub(1) | 1 << 1;

/// Attempt to create a new `EncodingMask` from an integer.
///
/// This will return an error if a bit outside the allowable range is set.
pub fn try_new(val: i32) -> Result<Self> {
if val as u32 & Self::ALLOWED_MASK != 0 {
return Err(general_err!("Attempt to create invalid mask: 0x{:x}", val));
}
Ok(Self(val))
}

/// Return an integer representation of this `EncodingMask`.
pub fn as_i32(&self) -> i32 {
self.0
}

/// Create a new `EncodingMask` from a collection of [`Encoding`]s.
pub fn new_from_encodings<'a>(encodings: impl Iterator<Item = &'a Encoding>) -> Self {
let mut mask = 0;
for &e in encodings {
mask |= 1 << (e as i32);
}
Self(mask)
}

/// Test if a given [`Encoding`] is present in this mask.
pub fn is_set(&self, val: Encoding) -> bool {
self.0 & (1 << (val as i32)) != 0
}

/// Test if all [`Encoding`]s in a given set are present in this mask.
pub fn all_set<'a>(&self, mut encodings: impl Iterator<Item = &'a Encoding>) -> bool {
encodings.all(|&e| self.is_set(e))
}

/// Return an iterator over all [`Encoding`]s present in this mask.
pub fn encodings(&self) -> impl Iterator<Item = Encoding> {
Self::mask_to_encodings_iter(self.0)
}

fn mask_to_encodings_iter(mask: i32) -> impl Iterator<Item = Encoding> {
(0..=Self::MAX_ENCODING)
.filter(move |i| mask & (1 << i) != 0)
.map(i32_to_encoding)
}
}

impl HeapSize for EncodingMask {
fn heap_size(&self) -> usize {
0 // no heap allocations
}
}

impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for EncodingMask {
fn read_thrift(prot: &mut R) -> Result<Self> {
let mut mask = 0;

// This reads a Thrift `list<Encoding>` and turns it into a bitmask
let list_ident = prot.read_list_begin()?;
for _ in 0..list_ident.size {
let val = Encoding::read_thrift(prot)?;
mask |= 1 << val as i32;
}
Ok(Self(mask))
}
}

#[allow(deprecated)]
fn i32_to_encoding(val: i32) -> Encoding {
match val {
0 => Encoding::PLAIN,
2 => Encoding::PLAIN_DICTIONARY,
3 => Encoding::RLE,
4 => Encoding::BIT_PACKED,
5 => Encoding::DELTA_BINARY_PACKED,
6 => Encoding::DELTA_LENGTH_BYTE_ARRAY,
7 => Encoding::DELTA_BYTE_ARRAY,
8 => Encoding::RLE_DICTIONARY,
9 => Encoding::BYTE_STREAM_SPLIT,
_ => panic!("Impossible encoding {val}"),
}
}

// ----------------------------------------------------------------------
// Mirrors thrift enum `CompressionCodec`

Expand Down Expand Up @@ -2409,4 +2536,59 @@ mod tests {
assert_eq!(EdgeInterpolationAlgorithm::ANDOYER.to_string(), "ANDOYER");
assert_eq!(EdgeInterpolationAlgorithm::KARNEY.to_string(), "KARNEY");
}

fn encodings_roundtrip(mut encodings: Vec<Encoding>) {
encodings.sort();
let mask = EncodingMask::new_from_encodings(encodings.iter());
assert!(mask.all_set(encodings.iter()));
let v = mask.encodings().collect::<Vec<_>>();
assert_eq!(v, encodings);
}

#[test]
fn test_encoding_roundtrip() {
encodings_roundtrip(
[
Encoding::RLE,
Encoding::PLAIN,
Encoding::DELTA_BINARY_PACKED,
]
.into(),
);
encodings_roundtrip([Encoding::RLE_DICTIONARY, Encoding::PLAIN_DICTIONARY].into());
encodings_roundtrip([].into());
let encodings = [
Encoding::PLAIN,
Encoding::BIT_PACKED,
Encoding::RLE,
Encoding::DELTA_BINARY_PACKED,
Encoding::DELTA_BYTE_ARRAY,
Encoding::DELTA_LENGTH_BYTE_ARRAY,
Encoding::PLAIN_DICTIONARY,
Encoding::RLE_DICTIONARY,
Encoding::BYTE_STREAM_SPLIT,
];
encodings_roundtrip(encodings.into());
}

#[test]
fn test_invalid_encoding_mask() {
// any set bits higher than the max should trigger an error
let res = EncodingMask::try_new(-1);
assert!(res.is_err());
let err = res.unwrap_err();
assert_eq!(
err.to_string(),
"Parquet error: Attempt to create invalid mask: 0xffffffff"
);

// test that GROUP_VAR_INT is disallowed
let res = EncodingMask::try_new(2);
assert!(res.is_err());
let err = res.unwrap_err();
assert_eq!(
err.to_string(),
"Parquet error: Attempt to create invalid mask: 0x2"
);
}
}
19 changes: 11 additions & 8 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::collections::{BTreeSet, VecDeque};
use std::str;

use crate::basic::{
BoundaryOrder, Compression, ConvertedType, Encoding, LogicalType, PageType, Type,
BoundaryOrder, Compression, ConvertedType, Encoding, EncodingMask, LogicalType, PageType, Type,
};
use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
Expand Down Expand Up @@ -1190,7 +1190,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {

let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
.set_compression(self.codec)
.set_encodings(self.encodings.iter().cloned().collect())
.set_encodings_mask(EncodingMask::new_from_encodings(self.encodings.iter()))
.set_page_encoding_stats(self.encoding_stats.clone())
.set_total_compressed_size(total_compressed_size)
.set_total_uncompressed_size(total_uncompressed_size)
Expand Down Expand Up @@ -1734,7 +1734,10 @@ mod tests {
assert_eq!(r.rows_written, 4);

let metadata = r.metadata;
assert_eq!(metadata.encodings(), &vec![Encoding::PLAIN, Encoding::RLE]);
assert_eq!(
metadata.encodings().collect::<Vec<_>>(),
vec![Encoding::PLAIN, Encoding::RLE]
);
assert_eq!(metadata.num_values(), 4); // just values
assert_eq!(metadata.dictionary_page_offset(), None);
}
Expand Down Expand Up @@ -2095,8 +2098,8 @@ mod tests {

let metadata = r.metadata;
assert_eq!(
metadata.encodings(),
&vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
metadata.encodings().collect::<Vec<_>>(),
vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
);
assert_eq!(metadata.num_values(), 4);
assert_eq!(metadata.compressed_size(), 20);
Expand Down Expand Up @@ -2221,8 +2224,8 @@ mod tests {

let metadata = r.metadata;
assert_eq!(
metadata.encodings(),
&vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
metadata.encodings().collect::<Vec<_>>(),
vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
);
assert_eq!(metadata.num_values(), 4);
assert_eq!(metadata.compressed_size(), 20);
Expand Down Expand Up @@ -4100,7 +4103,7 @@ mod tests {
.build();
let meta = column_write_and_get_metadata::<T>(props, data);
assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
assert_eq!(meta.encodings(), encodings);
assert_eq!(meta.encodings().collect::<Vec<_>>(), encodings);
assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats);
}

Expand Down
Loading
Loading