Skip to content

Commit c94698c

Browse files
authored
[thrift-remodel] Refactor Parquet Thrift code into new thrift module (#8599)
# Which issue does this PR close? - Part of #5853. # Rationale for this change Earlier work had introduced some code duplication dealing with decoding of the `ColumnMetaData` Thrift struct. This PR addresses that, and also addresses earlier review comments (#8587 (comment)). # What changes are included in this PR? This PR changes how some metadata structures are parsed, utilizing a flag for required fields rather than relying on `Option::is_some`. This allows for passing around partially initialized `ColumnChunkMetaData` structs which in turn allows for sharing of the `ColumnMetaData` parsing code between the encrypted and unencrypted code paths. This PR also moves the `file/metadata/{encryption,thrift_gen}.rs` files to a new `file::metadata::thrift` module. # Are these changes tested? Covered by existing tests. # Are there any user-facing changes? No, only makes changes to private APIs.
1 parent b8fdd90 commit c94698c

File tree

13 files changed

+307
-600
lines changed

13 files changed

+307
-600
lines changed

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1532,7 +1532,7 @@ mod tests {
15321532
use crate::arrow::ARROW_SCHEMA_META_KEY;
15331533
use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
15341534
use crate::column::page::{Page, PageReader};
1535-
use crate::file::metadata::thrift_gen::PageHeader;
1535+
use crate::file::metadata::thrift::PageHeader;
15361536
use crate::file::page_index::column_index::ColumnIndexMetaData;
15371537
use crate::file::reader::SerializedPageReader;
15381538
use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};

parquet/src/column/page.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use bytes::Bytes;
2121

2222
use crate::basic::{Encoding, PageType};
2323
use crate::errors::{ParquetError, Result};
24-
use crate::file::metadata::thrift_gen::{
24+
use crate::file::metadata::thrift::{
2525
DataPageHeader, DataPageHeaderV2, DictionaryPageHeader, PageHeader,
2626
};
2727
use crate::file::statistics::{Statistics, page_stats_to_thrift};
@@ -345,11 +345,11 @@ pub struct PageMetadata {
345345
pub is_dict: bool,
346346
}
347347

348-
impl TryFrom<&crate::file::metadata::thrift_gen::PageHeader> for PageMetadata {
348+
impl TryFrom<&crate::file::metadata::thrift::PageHeader> for PageMetadata {
349349
type Error = ParquetError;
350350

351351
fn try_from(
352-
value: &crate::file::metadata::thrift_gen::PageHeader,
352+
value: &crate::file::metadata::thrift::PageHeader,
353353
) -> std::result::Result<Self, Self::Error> {
354354
match value.r#type {
355355
PageType::DATA_PAGE => {

parquet/src/column/page_encryption.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::encryption::encrypt::{FileEncryptor, encrypt_thrift_object};
2222
use crate::encryption::modules::{ModuleType, create_module_aad};
2323
use crate::errors::ParquetError;
2424
use crate::errors::Result;
25-
use crate::file::metadata::thrift_gen::PageHeader;
25+
use crate::file::metadata::thrift::PageHeader;
2626
use bytes::Bytes;
2727
use std::io::Write;
2828
use std::sync::Arc;

parquet/src/column/page_encryption_disabled.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
use crate::column::page::CompressedPage;
1919
use crate::errors::Result;
20-
use crate::file::metadata::thrift_gen::PageHeader;
20+
use crate::file::metadata::thrift::PageHeader;
2121
use std::io::Write;
2222

2323
#[derive(Debug)]

parquet/src/file/metadata/mod.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,24 +87,22 @@
8787
//!
8888
//! * Same name, different struct
8989
//! ```
90-
#[cfg(feature = "encryption")]
91-
mod encryption;
9290
mod footer_tail;
9391
mod memory;
9492
mod parser;
9593
mod push_decoder;
9694
pub(crate) mod reader;
97-
pub(crate) mod thrift_gen;
95+
pub(crate) mod thrift;
9896
mod writer;
9997

10098
use crate::basic::{EncodingMask, PageType};
10199
#[cfg(feature = "encryption")]
102100
use crate::encryption::decrypt::FileDecryptor;
103101
#[cfg(feature = "encryption")]
104102
use crate::file::column_crypto_metadata::ColumnCryptoMetaData;
105-
#[cfg(feature = "encryption")]
106-
use crate::file::metadata::encryption::EncryptionAlgorithm;
107103
pub(crate) use crate::file::metadata::memory::HeapSize;
104+
#[cfg(feature = "encryption")]
105+
use crate::file::metadata::thrift::encryption::EncryptionAlgorithm;
108106
use crate::file::page_index::column_index::{ByteArrayColumnIndex, PrimitiveColumnIndex};
109107
use crate::file::page_index::{column_index::ColumnIndexMetaData, offset_index::PageLocation};
110108
use crate::file::statistics::Statistics;
@@ -784,6 +782,11 @@ impl RowGroupMetaDataBuilder {
784782

785783
Ok(self.0)
786784
}
785+
786+
/// Build row group metadata without validation.
787+
pub(super) fn build_unchecked(self) -> RowGroupMetaData {
788+
self.0
789+
}
787790
}
788791

789792
/// Metadata for a column chunk.
@@ -1600,7 +1603,7 @@ impl OffsetIndexBuilder {
16001603
mod tests {
16011604
use super::*;
16021605
use crate::basic::{PageType, SortOrder};
1603-
use crate::file::metadata::thrift_gen::tests::{read_column_chunk, read_row_group};
1606+
use crate::file::metadata::thrift::tests::{read_column_chunk, read_row_group};
16041607

16051608
#[test]
16061609
fn test_row_group_metadata_thrift_conversion() {

parquet/src/file/metadata/parser.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
//! into the corresponding Rust structures
2222
2323
use crate::errors::ParquetError;
24-
use crate::file::metadata::thrift_gen::parquet_metadata_from_bytes;
24+
use crate::file::metadata::thrift::parquet_metadata_from_bytes;
2525
use crate::file::metadata::{ColumnChunkMetaData, PageIndexPolicy, ParquetMetaData};
2626

2727
use crate::file::page_index::column_index::ColumnIndexMetaData;
@@ -72,7 +72,7 @@ mod inner {
7272
encrypted_footer: bool,
7373
) -> Result<ParquetMetaData> {
7474
if encrypted_footer || self.file_decryption_properties.is_some() {
75-
crate::file::metadata::encryption::parquet_metadata_with_encryption(
75+
crate::file::metadata::thrift::encryption::parquet_metadata_with_encryption(
7676
self.file_decryption_properties.as_deref(),
7777
encrypted_footer,
7878
buf,

parquet/src/file/metadata/encryption.rs renamed to parquet/src/file/metadata/thrift/encryption.rs

Lines changed: 6 additions & 212 deletions
Original file line numberDiff line numberDiff line change
@@ -15,28 +15,23 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
// a collection of generated structs used to parse thrift metadata
18+
//! Encryption support for Thrift serialization
1919
2020
use std::io::Write;
2121

2222
use crate::{
23-
basic::{Compression, EncodingMask},
2423
encryption::decrypt::{FileDecryptionProperties, FileDecryptor},
2524
errors::{ParquetError, Result},
2625
file::{
2726
column_crypto_metadata::ColumnCryptoMetaData,
2827
metadata::{
29-
HeapSize, LevelHistogram, PageEncodingStats, ParquetMetaData, RowGroupMetaData,
30-
thrift_gen::{
31-
GeospatialStatistics, SizeStatistics, Statistics, convert_geo_stats, convert_stats,
32-
parquet_metadata_from_bytes,
33-
},
28+
HeapSize, ParquetMetaData, RowGroupMetaData,
29+
thrift::{parquet_metadata_from_bytes, read_column_metadata, validate_column_metadata},
3430
},
3531
},
3632
parquet_thrift::{
3733
ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
3834
ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift, WriteThriftField,
39-
read_thrift_vec,
4035
},
4136
thrift_struct, thrift_union,
4237
};
@@ -180,41 +175,9 @@ fn row_group_from_encrypted_thrift(
180175
})?;
181176

182177
// parse decrypted buffer and then replace fields in 'c'
183-
let col_meta = read_column_metadata(decrypted_cc_buf.as_slice())?;
184-
185-
let (
186-
unencoded_byte_array_data_bytes,
187-
repetition_level_histogram,
188-
definition_level_histogram,
189-
) = if let Some(size_stats) = col_meta.size_statistics {
190-
(
191-
size_stats.unencoded_byte_array_data_bytes,
192-
size_stats.repetition_level_histogram,
193-
size_stats.definition_level_histogram,
194-
)
195-
} else {
196-
(None, None, None)
197-
};
198-
199-
let repetition_level_histogram = repetition_level_histogram.map(LevelHistogram::from);
200-
let definition_level_histogram = definition_level_histogram.map(LevelHistogram::from);
201-
202-
c.encodings = col_meta.encodings;
203-
c.compression = col_meta.codec;
204-
c.num_values = col_meta.num_values;
205-
c.total_uncompressed_size = col_meta.total_uncompressed_size;
206-
c.total_compressed_size = col_meta.total_compressed_size;
207-
c.data_page_offset = col_meta.data_page_offset;
208-
c.index_page_offset = col_meta.index_page_offset;
209-
c.dictionary_page_offset = col_meta.dictionary_page_offset;
210-
c.statistics = convert_stats(d.physical_type(), col_meta.statistics)?;
211-
c.encoding_stats = col_meta.encoding_stats;
212-
c.bloom_filter_offset = col_meta.bloom_filter_offset;
213-
c.bloom_filter_length = col_meta.bloom_filter_length;
214-
c.unencoded_byte_array_data_bytes = unencoded_byte_array_data_bytes;
215-
c.repetition_level_histogram = repetition_level_histogram;
216-
c.definition_level_histogram = definition_level_histogram;
217-
c.geo_statistics = convert_geo_stats(col_meta.geospatial_statistics);
178+
let mut prot = ThriftSliceInputProtocol::new(&decrypted_cc_buf);
179+
let mask = read_column_metadata(&mut prot, &mut c)?;
180+
validate_column_metadata(mask)?;
218181

219182
columns.push(c);
220183
} else {
@@ -373,172 +336,3 @@ fn get_file_decryptor(
373336
)),
374337
}
375338
}
376-
377-
#[derive(Clone, Debug, Eq, PartialEq)]
378-
struct ColumnMetaData<'a> {
379-
encodings: EncodingMask,
380-
codec: Compression,
381-
num_values: i64,
382-
total_uncompressed_size: i64,
383-
total_compressed_size: i64,
384-
data_page_offset: i64,
385-
index_page_offset: Option<i64>,
386-
dictionary_page_offset: Option<i64>,
387-
statistics: Option<Statistics<'a>>,
388-
encoding_stats: Option<Vec<PageEncodingStats>>,
389-
bloom_filter_offset: Option<i64>,
390-
bloom_filter_length: Option<i32>,
391-
size_statistics: Option<SizeStatistics>,
392-
geospatial_statistics: Option<GeospatialStatistics>,
393-
}
394-
395-
fn read_column_metadata<'a>(buf: &'a [u8]) -> Result<ColumnMetaData<'a>> {
396-
let mut prot = ThriftSliceInputProtocol::new(buf);
397-
398-
let mut encodings: Option<EncodingMask> = None;
399-
let mut codec: Option<Compression> = None;
400-
let mut num_values: Option<i64> = None;
401-
let mut total_uncompressed_size: Option<i64> = None;
402-
let mut total_compressed_size: Option<i64> = None;
403-
let mut data_page_offset: Option<i64> = None;
404-
let mut index_page_offset: Option<i64> = None;
405-
let mut dictionary_page_offset: Option<i64> = None;
406-
let mut statistics: Option<Statistics> = None;
407-
let mut encoding_stats: Option<Vec<PageEncodingStats>> = None;
408-
let mut bloom_filter_offset: Option<i64> = None;
409-
let mut bloom_filter_length: Option<i32> = None;
410-
let mut size_statistics: Option<SizeStatistics> = None;
411-
let mut geospatial_statistics: Option<GeospatialStatistics> = None;
412-
413-
// `ColumnMetaData`. Read inline for performance sake.
414-
// struct ColumnMetaData {
415-
// 1: required Type type
416-
// 2: required list<Encoding> encodings
417-
// 3: required list<string> path_in_schema
418-
// 4: required CompressionCodec codec
419-
// 5: required i64 num_values
420-
// 6: required i64 total_uncompressed_size
421-
// 7: required i64 total_compressed_size
422-
// 8: optional list<KeyValue> key_value_metadata
423-
// 9: required i64 data_page_offset
424-
// 10: optional i64 index_page_offset
425-
// 11: optional i64 dictionary_page_offset
426-
// 12: optional Statistics statistics;
427-
// 13: optional list<PageEncodingStats> encoding_stats;
428-
// 14: optional i64 bloom_filter_offset;
429-
// 15: optional i32 bloom_filter_length;
430-
// 16: optional SizeStatistics size_statistics;
431-
// 17: optional GeospatialStatistics geospatial_statistics;
432-
// }
433-
let mut last_field_id = 0i16;
434-
loop {
435-
let field_ident = prot.read_field_begin(last_field_id)?;
436-
if field_ident.field_type == FieldType::Stop {
437-
break;
438-
}
439-
match field_ident.id {
440-
// 1: type is never used, we can use the column descriptor
441-
2 => {
442-
let val = EncodingMask::read_thrift(&mut prot)?;
443-
encodings = Some(val);
444-
}
445-
// 3: path_in_schema is redundant
446-
4 => {
447-
codec = Some(Compression::read_thrift(&mut prot)?);
448-
}
449-
5 => {
450-
num_values = Some(i64::read_thrift(&mut prot)?);
451-
}
452-
6 => {
453-
total_uncompressed_size = Some(i64::read_thrift(&mut prot)?);
454-
}
455-
7 => {
456-
total_compressed_size = Some(i64::read_thrift(&mut prot)?);
457-
}
458-
// 8: we don't expose this key value
459-
9 => {
460-
data_page_offset = Some(i64::read_thrift(&mut prot)?);
461-
}
462-
10 => {
463-
index_page_offset = Some(i64::read_thrift(&mut prot)?);
464-
}
465-
11 => {
466-
dictionary_page_offset = Some(i64::read_thrift(&mut prot)?);
467-
}
468-
12 => {
469-
statistics = Some(Statistics::read_thrift(&mut prot)?);
470-
}
471-
13 => {
472-
let val =
473-
read_thrift_vec::<PageEncodingStats, ThriftSliceInputProtocol>(&mut prot)?;
474-
encoding_stats = Some(val);
475-
}
476-
14 => {
477-
bloom_filter_offset = Some(i64::read_thrift(&mut prot)?);
478-
}
479-
15 => {
480-
bloom_filter_length = Some(i32::read_thrift(&mut prot)?);
481-
}
482-
16 => {
483-
let val = SizeStatistics::read_thrift(&mut prot)?;
484-
size_statistics = Some(val);
485-
}
486-
17 => {
487-
let val = GeospatialStatistics::read_thrift(&mut prot)?;
488-
geospatial_statistics = Some(val);
489-
}
490-
_ => {
491-
prot.skip(field_ident.field_type)?;
492-
}
493-
};
494-
last_field_id = field_ident.id;
495-
}
496-
497-
let Some(encodings) = encodings else {
498-
return Err(ParquetError::General(
499-
"Required field encodings is missing".to_owned(),
500-
));
501-
};
502-
let Some(codec) = codec else {
503-
return Err(ParquetError::General(
504-
"Required field codec is missing".to_owned(),
505-
));
506-
};
507-
let Some(num_values) = num_values else {
508-
return Err(ParquetError::General(
509-
"Required field num_values is missing".to_owned(),
510-
));
511-
};
512-
let Some(total_uncompressed_size) = total_uncompressed_size else {
513-
return Err(ParquetError::General(
514-
"Required field total_uncompressed_size is missing".to_owned(),
515-
));
516-
};
517-
let Some(total_compressed_size) = total_compressed_size else {
518-
return Err(ParquetError::General(
519-
"Required field total_compressed_size is missing".to_owned(),
520-
));
521-
};
522-
let Some(data_page_offset) = data_page_offset else {
523-
return Err(ParquetError::General(
524-
"Required field data_page_offset is missing".to_owned(),
525-
));
526-
};
527-
528-
Ok(ColumnMetaData {
529-
encodings,
530-
num_values,
531-
codec,
532-
total_uncompressed_size,
533-
total_compressed_size,
534-
data_page_offset,
535-
index_page_offset,
536-
dictionary_page_offset,
537-
statistics,
538-
encoding_stats,
539-
bloom_filter_offset,
540-
bloom_filter_length,
541-
size_statistics,
542-
geospatial_statistics,
543-
})
544-
}

0 commit comments

Comments
 (0)