Skip to content

Commit b430682

Browse files
committed
store encodings as bitmask
1 parent 09fc5ea commit b430682

File tree

6 files changed

+256
-51
lines changed

6 files changed

+256
-51
lines changed

parquet/src/basic.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -724,6 +724,58 @@ impl FromStr for Encoding {
724724
}
725725
}
726726

727+
const MAX_ENCODING: i32 = Encoding::BYTE_STREAM_SPLIT as i32;
728+
729+
/// Given a Thrift input stream, read a vector of [`Encoding`] enums and convert to a bitmask.
730+
pub(super) fn thrift_encodings_to_mask<'a, R: ThriftCompactInputProtocol<'a>>(
731+
prot: &mut R,
732+
) -> Result<i32> {
733+
let mut mask = 0;
734+
735+
let list_ident = prot.read_list_begin()?;
736+
for _ in 0..list_ident.size {
737+
let val = i32::read_thrift(prot)?;
738+
if (0..=MAX_ENCODING).contains(&val) {
739+
mask |= 1 << val;
740+
}
741+
}
742+
Ok(mask)
743+
}
744+
745+
#[allow(deprecated)]
746+
fn i32_to_encoding(val: i32) -> Encoding {
747+
match val {
748+
0 => Encoding::PLAIN,
749+
2 => Encoding::PLAIN_DICTIONARY,
750+
3 => Encoding::RLE,
751+
4 => Encoding::BIT_PACKED,
752+
5 => Encoding::DELTA_BINARY_PACKED,
753+
6 => Encoding::DELTA_LENGTH_BYTE_ARRAY,
754+
7 => Encoding::DELTA_BYTE_ARRAY,
755+
8 => Encoding::RLE_DICTIONARY,
756+
9 => Encoding::BYTE_STREAM_SPLIT,
757+
_ => panic!("Impossible encoding {val}"),
758+
}
759+
}
760+
761+
pub(super) fn encodings_to_mask<'a, I>(encodings: I) -> i32
762+
where
763+
I: Iterator<Item = &'a Encoding>,
764+
{
765+
let mut mask = 0;
766+
for e in encodings {
767+
mask |= 1 << (*e as i32);
768+
}
769+
mask
770+
}
771+
772+
pub(super) fn mask_to_encodings_vec(mask: i32) -> Vec<Encoding> {
773+
(0..=MAX_ENCODING)
774+
.filter(|i| mask & (1 << i) != 0)
775+
.map(i32_to_encoding)
776+
.collect()
777+
}
778+
727779
// ----------------------------------------------------------------------
728780
// Mirrors thrift enum `CompressionCodec`
729781

parquet/src/column/writer/mod.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use std::str;
2828

2929
use crate::basic::{
3030
BoundaryOrder, Compression, ConvertedType, Encoding, LogicalType, PageType, Type,
31+
encodings_to_mask,
3132
};
3233
use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
3334
use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
@@ -1190,7 +1191,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
11901191

11911192
let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
11921193
.set_compression(self.codec)
1193-
.set_encodings(self.encodings.iter().cloned().collect())
1194+
.set_encodings_mask(encodings_to_mask(self.encodings.iter()))
11941195
.set_page_encoding_stats(self.encoding_stats.clone())
11951196
.set_total_compressed_size(total_compressed_size)
11961197
.set_total_uncompressed_size(total_uncompressed_size)
@@ -1734,7 +1735,7 @@ mod tests {
17341735
assert_eq!(r.rows_written, 4);
17351736

17361737
let metadata = r.metadata;
1737-
assert_eq!(metadata.encodings(), &vec![Encoding::PLAIN, Encoding::RLE]);
1738+
assert_eq!(metadata.encodings(), vec![Encoding::PLAIN, Encoding::RLE]);
17381739
assert_eq!(metadata.num_values(), 4); // just values
17391740
assert_eq!(metadata.dictionary_page_offset(), None);
17401741
}
@@ -2096,7 +2097,7 @@ mod tests {
20962097
let metadata = r.metadata;
20972098
assert_eq!(
20982099
metadata.encodings(),
2099-
&vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2100+
vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
21002101
);
21012102
assert_eq!(metadata.num_values(), 4);
21022103
assert_eq!(metadata.compressed_size(), 20);
@@ -2222,7 +2223,7 @@ mod tests {
22222223
let metadata = r.metadata;
22232224
assert_eq!(
22242225
metadata.encodings(),
2225-
&vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2226+
vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
22262227
);
22272228
assert_eq!(metadata.num_values(), 4);
22282229
assert_eq!(metadata.compressed_size(), 20);

parquet/src/file/metadata/encryption.rs

Lines changed: 172 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
use std::io::Write;
2121

2222
use crate::{
23-
basic::{Compression, Encoding, Type},
23+
basic::{Compression, thrift_encodings_to_mask},
2424
encryption::decrypt::{FileDecryptionProperties, FileDecryptor},
2525
errors::{ParquetError, Result},
2626
file::{
@@ -115,32 +115,6 @@ pub(crate) struct FileCryptoMetaData<'a> {
115115
}
116116
);
117117

118-
type CompressionCodec = Compression;
119-
120-
thrift_struct!(
121-
struct ColumnMetaData<'a> {
122-
1: required Type r#type
123-
2: required list<Encoding> encodings
124-
// we don't expose path_in_schema so skip
125-
//3: required list<string> path_in_schema
126-
4: required CompressionCodec codec
127-
5: required i64 num_values
128-
6: required i64 total_uncompressed_size
129-
7: required i64 total_compressed_size
130-
// we don't expose key_value_metadata so skip
131-
//8: optional list<KeyValue> key_value_metadata
132-
9: required i64 data_page_offset
133-
10: optional i64 index_page_offset
134-
11: optional i64 dictionary_page_offset
135-
12: optional Statistics<'a> statistics
136-
13: optional list<PageEncodingStats> encoding_stats;
137-
14: optional i64 bloom_filter_offset;
138-
15: optional i32 bloom_filter_length;
139-
16: optional SizeStatistics size_statistics;
140-
17: optional GeospatialStatistics geospatial_statistics;
141-
}
142-
);
143-
144118
fn row_group_from_encrypted_thrift(
145119
mut rg: RowGroupMetaData,
146120
decryptor: Option<&FileDecryptor>,
@@ -206,8 +180,7 @@ fn row_group_from_encrypted_thrift(
206180
})?;
207181

208182
// parse decrypted buffer and then replace fields in 'c'
209-
let mut prot = ThriftSliceInputProtocol::new(decrypted_cc_buf.as_slice());
210-
let col_meta = ColumnMetaData::read_thrift(&mut prot)?;
183+
let col_meta = read_column_metadata(decrypted_cc_buf.as_slice())?;
211184

212185
let (
213186
unencoded_byte_array_data_bytes,
@@ -234,7 +207,7 @@ fn row_group_from_encrypted_thrift(
234207
c.data_page_offset = col_meta.data_page_offset;
235208
c.index_page_offset = col_meta.index_page_offset;
236209
c.dictionary_page_offset = col_meta.dictionary_page_offset;
237-
c.statistics = convert_stats(col_meta.r#type, col_meta.statistics)?;
210+
c.statistics = convert_stats(d.physical_type(), col_meta.statistics)?;
238211
c.encoding_stats = col_meta.encoding_stats;
239212
c.bloom_filter_offset = col_meta.bloom_filter_offset;
240213
c.bloom_filter_length = col_meta.bloom_filter_length;
@@ -400,3 +373,172 @@ fn get_file_decryptor(
400373
)),
401374
}
402375
}
376+
377+
#[derive(Clone, Debug, Eq, PartialEq)]
378+
struct ColumnMetaData<'a> {
379+
encodings: i32,
380+
codec: Compression,
381+
num_values: i64,
382+
total_uncompressed_size: i64,
383+
total_compressed_size: i64,
384+
data_page_offset: i64,
385+
index_page_offset: Option<i64>,
386+
dictionary_page_offset: Option<i64>,
387+
statistics: Option<Statistics<'a>>,
388+
encoding_stats: Option<Vec<PageEncodingStats>>,
389+
bloom_filter_offset: Option<i64>,
390+
bloom_filter_length: Option<i32>,
391+
size_statistics: Option<SizeStatistics>,
392+
geospatial_statistics: Option<GeospatialStatistics>,
393+
}
394+
395+
fn read_column_metadata<'a>(buf: &'a [u8]) -> Result<ColumnMetaData<'a>> {
396+
let mut prot = ThriftSliceInputProtocol::new(buf);
397+
398+
let mut encodings: Option<i32> = None;
399+
let mut codec: Option<Compression> = None;
400+
let mut num_values: Option<i64> = None;
401+
let mut total_uncompressed_size: Option<i64> = None;
402+
let mut total_compressed_size: Option<i64> = None;
403+
let mut data_page_offset: Option<i64> = None;
404+
let mut index_page_offset: Option<i64> = None;
405+
let mut dictionary_page_offset: Option<i64> = None;
406+
let mut statistics: Option<Statistics> = None;
407+
let mut encoding_stats: Option<Vec<PageEncodingStats>> = None;
408+
let mut bloom_filter_offset: Option<i64> = None;
409+
let mut bloom_filter_length: Option<i32> = None;
410+
let mut size_statistics: Option<SizeStatistics> = None;
411+
let mut geospatial_statistics: Option<GeospatialStatistics> = None;
412+
413+
// `ColumnMetaData`. Read inline for performance sake.
414+
// struct ColumnMetaData {
415+
// 1: required Type type
416+
// 2: required list<Encoding> encodings
417+
// 3: required list<string> path_in_schema
418+
// 4: required CompressionCodec codec
419+
// 5: required i64 num_values
420+
// 6: required i64 total_uncompressed_size
421+
// 7: required i64 total_compressed_size
422+
// 8: optional list<KeyValue> key_value_metadata
423+
// 9: required i64 data_page_offset
424+
// 10: optional i64 index_page_offset
425+
// 11: optional i64 dictionary_page_offset
426+
// 12: optional Statistics statistics;
427+
// 13: optional list<PageEncodingStats> encoding_stats;
428+
// 14: optional i64 bloom_filter_offset;
429+
// 15: optional i32 bloom_filter_length;
430+
// 16: optional SizeStatistics size_statistics;
431+
// 17: optional GeospatialStatistics geospatial_statistics;
432+
// }
433+
let mut last_field_id = 0i16;
434+
loop {
435+
let field_ident = prot.read_field_begin(last_field_id)?;
436+
if field_ident.field_type == FieldType::Stop {
437+
break;
438+
}
439+
match field_ident.id {
440+
// 1: type is never used, we can use the column descriptor
441+
2 => {
442+
let val = thrift_encodings_to_mask(&mut prot)?;
443+
encodings = Some(val);
444+
}
445+
// 3: path_in_schema is redundant
446+
4 => {
447+
codec = Some(Compression::read_thrift(&mut prot)?);
448+
}
449+
5 => {
450+
num_values = Some(i64::read_thrift(&mut prot)?);
451+
}
452+
6 => {
453+
total_uncompressed_size = Some(i64::read_thrift(&mut prot)?);
454+
}
455+
7 => {
456+
total_compressed_size = Some(i64::read_thrift(&mut prot)?);
457+
}
458+
// 8: we don't expose this key value
459+
9 => {
460+
data_page_offset = Some(i64::read_thrift(&mut prot)?);
461+
}
462+
10 => {
463+
index_page_offset = Some(i64::read_thrift(&mut prot)?);
464+
}
465+
11 => {
466+
dictionary_page_offset = Some(i64::read_thrift(&mut prot)?);
467+
}
468+
12 => {
469+
statistics = Some(Statistics::read_thrift(&mut prot)?);
470+
}
471+
13 => {
472+
let val =
473+
read_thrift_vec::<PageEncodingStats, ThriftSliceInputProtocol>(&mut prot)?;
474+
encoding_stats = Some(val);
475+
}
476+
14 => {
477+
bloom_filter_offset = Some(i64::read_thrift(&mut prot)?);
478+
}
479+
15 => {
480+
bloom_filter_length = Some(i32::read_thrift(&mut prot)?);
481+
}
482+
16 => {
483+
let val = SizeStatistics::read_thrift(&mut prot)?;
484+
size_statistics = Some(val);
485+
}
486+
17 => {
487+
let val = GeospatialStatistics::read_thrift(&mut prot)?;
488+
geospatial_statistics = Some(val);
489+
}
490+
_ => {
491+
prot.skip(field_ident.field_type)?;
492+
}
493+
};
494+
last_field_id = field_ident.id;
495+
}
496+
497+
let Some(encodings) = encodings else {
498+
return Err(ParquetError::General(
499+
"Required field encodings is missing".to_owned(),
500+
));
501+
};
502+
let Some(codec) = codec else {
503+
return Err(ParquetError::General(
504+
"Required field codec is missing".to_owned(),
505+
));
506+
};
507+
let Some(num_values) = num_values else {
508+
return Err(ParquetError::General(
509+
"Required field num_values is missing".to_owned(),
510+
));
511+
};
512+
let Some(total_uncompressed_size) = total_uncompressed_size else {
513+
return Err(ParquetError::General(
514+
"Required field total_uncompressed_size is missing".to_owned(),
515+
));
516+
};
517+
let Some(total_compressed_size) = total_compressed_size else {
518+
return Err(ParquetError::General(
519+
"Required field total_compressed_size is missing".to_owned(),
520+
));
521+
};
522+
let Some(data_page_offset) = data_page_offset else {
523+
return Err(ParquetError::General(
524+
"Required field data_page_offset is missing".to_owned(),
525+
));
526+
};
527+
528+
Ok(ColumnMetaData {
529+
encodings,
530+
num_values,
531+
codec,
532+
total_uncompressed_size,
533+
total_compressed_size,
534+
data_page_offset,
535+
index_page_offset,
536+
dictionary_page_offset,
537+
statistics,
538+
encoding_stats,
539+
bloom_filter_offset,
540+
bloom_filter_length,
541+
size_statistics,
542+
geospatial_statistics,
543+
})
544+
}

0 commit comments

Comments
 (0)