Skip to content

Commit b51a000

Browse files
authored
[thrift-remodel] Remove conversion functions for row group and column metadata (#8574)
# Which issue does this PR close? - Part of #5853 - Closes #8517. # Rationale for this change A good bit (around 15%) of Parquet metadata parsing involves first decoding to thrift structs (`FileMetaData`, `RowGroup`, etc), and then converting to the metadata structs used by this crate (`ParquetMetaData`, `RowGroupMetaData`, etc). This PR removes some of the intermediate structures and parses straight to the crate structs. # What changes are included in this PR? Some thrift generated structures are removed, and the code necessary to decode has been hand written. This will enable future optimizations such as selectively decoding parts of the metadata. In addition to the above, this PR cleans up some of the memory size computation, and also boxes some of the objects used for decryption. # Are these changes tested? Should be covered by existing tests. # Are there any user-facing changes? No, only private APIs are changed.
1 parent d90faef commit b51a000

File tree

9 files changed

+827
-380
lines changed

9 files changed

+827
-380
lines changed

parquet/src/file/metadata/memory.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ impl<T: HeapSize> HeapSize for Arc<T> {
5656
}
5757
}
5858

59+
impl<T: HeapSize> HeapSize for Box<T> {
60+
fn heap_size(&self) -> usize {
61+
std::mem::size_of::<T>() + self.as_ref().heap_size()
62+
}
63+
}
64+
5965
impl<T: HeapSize> HeapSize for Option<T> {
6066
fn heap_size(&self) -> usize {
6167
self.as_ref().map(|inner| inner.heap_size()).unwrap_or(0)
@@ -70,10 +76,17 @@ impl HeapSize for String {
7076

7177
impl HeapSize for FileMetaData {
7278
fn heap_size(&self) -> usize {
79+
#[cfg(feature = "encryption")]
80+
let encryption_heap_size =
81+
self.encryption_algorithm.heap_size() + self.footer_signing_key_metadata.heap_size();
82+
#[cfg(not(feature = "encryption"))]
83+
let encryption_heap_size = 0;
84+
7385
self.created_by.heap_size()
7486
+ self.key_value_metadata.heap_size()
7587
+ self.schema_descr.heap_size()
7688
+ self.column_orders.heap_size()
89+
+ encryption_heap_size
7790
}
7891
}
7992

@@ -109,6 +122,7 @@ impl HeapSize for ColumnChunkMetaData {
109122
+ self.unencoded_byte_array_data_bytes.heap_size()
110123
+ self.repetition_level_histogram.heap_size()
111124
+ self.definition_level_histogram.heap_size()
125+
+ self.geo_statistics.heap_size()
112126
+ encryption_heap_size
113127
}
114128
}
@@ -141,6 +155,7 @@ impl HeapSize for PageType {
141155
0 // no heap allocations
142156
}
143157
}
158+
144159
impl HeapSize for Statistics {
145160
fn heap_size(&self) -> usize {
146161
match self {

parquet/src/file/metadata/mod.rs

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ use crate::encryption::decrypt::FileDecryptor;
101101
#[cfg(feature = "encryption")]
102102
use crate::file::column_crypto_metadata::ColumnCryptoMetaData;
103103
pub(crate) use crate::file::metadata::memory::HeapSize;
104+
#[cfg(feature = "encryption")]
105+
use crate::file::metadata::thrift_gen::EncryptionAlgorithm;
104106
use crate::file::page_index::column_index::{ByteArrayColumnIndex, PrimitiveColumnIndex};
105107
use crate::file::page_index::{column_index::ColumnIndexMetaData, offset_index::PageLocation};
106108
use crate::file::statistics::Statistics;
@@ -194,7 +196,7 @@ pub struct ParquetMetaData {
194196
offset_index: Option<ParquetOffsetIndex>,
195197
/// Optional file decryptor
196198
#[cfg(feature = "encryption")]
197-
file_decryptor: Option<FileDecryptor>,
199+
file_decryptor: Option<Box<FileDecryptor>>,
198200
}
199201

200202
impl ParquetMetaData {
@@ -215,7 +217,7 @@ impl ParquetMetaData {
215217
/// encrypted data.
216218
#[cfg(feature = "encryption")]
217219
pub(crate) fn with_file_decryptor(&mut self, file_decryptor: Option<FileDecryptor>) {
218-
self.file_decryptor = file_decryptor;
220+
self.file_decryptor = file_decryptor.map(Box::new);
219221
}
220222

221223
/// Convert this ParquetMetaData into a [`ParquetMetaDataBuilder`]
@@ -231,7 +233,7 @@ impl ParquetMetaData {
231233
/// Returns file decryptor as reference.
232234
#[cfg(feature = "encryption")]
233235
pub(crate) fn file_decryptor(&self) -> Option<&FileDecryptor> {
234-
self.file_decryptor.as_ref()
236+
self.file_decryptor.as_deref()
235237
}
236238

237239
/// Returns number of row groups in this file.
@@ -411,6 +413,13 @@ impl ParquetMetaDataBuilder {
411413
self.0.offset_index.as_ref()
412414
}
413415

416+
/// Sets the file decryptor needed to decrypt this metadata.
417+
#[cfg(feature = "encryption")]
418+
pub(crate) fn set_file_decryptor(mut self, file_decryptor: Option<FileDecryptor>) -> Self {
419+
self.0.with_file_decryptor(file_decryptor);
420+
self
421+
}
422+
414423
/// Creates a new ParquetMetaData from the builder
415424
pub fn build(self) -> ParquetMetaData {
416425
let Self(metadata) = self;
@@ -468,6 +477,10 @@ pub struct FileMetaData {
468477
key_value_metadata: Option<Vec<KeyValue>>,
469478
schema_descr: SchemaDescPtr,
470479
column_orders: Option<Vec<ColumnOrder>>,
480+
#[cfg(feature = "encryption")]
481+
encryption_algorithm: Option<Box<EncryptionAlgorithm>>,
482+
#[cfg(feature = "encryption")]
483+
footer_signing_key_metadata: Option<Vec<u8>>,
471484
}
472485

473486
impl FileMetaData {
@@ -487,9 +500,31 @@ impl FileMetaData {
487500
key_value_metadata,
488501
schema_descr,
489502
column_orders,
503+
#[cfg(feature = "encryption")]
504+
encryption_algorithm: None,
505+
#[cfg(feature = "encryption")]
506+
footer_signing_key_metadata: None,
490507
}
491508
}
492509

510+
#[cfg(feature = "encryption")]
511+
pub(crate) fn with_encryption_algorithm(
512+
mut self,
513+
encryption_algorithm: Option<EncryptionAlgorithm>,
514+
) -> Self {
515+
self.encryption_algorithm = encryption_algorithm.map(Box::new);
516+
self
517+
}
518+
519+
#[cfg(feature = "encryption")]
520+
pub(crate) fn with_footer_signing_key_metadata(
521+
mut self,
522+
footer_signing_key_metadata: Option<Vec<u8>>,
523+
) -> Self {
524+
self.footer_signing_key_metadata = footer_signing_key_metadata;
525+
self
526+
}
527+
493528
/// Returns version of this file.
494529
pub fn version(&self) -> i32 {
495530
self.version
@@ -776,7 +811,7 @@ pub struct ColumnChunkMetaData {
776811
repetition_level_histogram: Option<LevelHistogram>,
777812
definition_level_histogram: Option<LevelHistogram>,
778813
#[cfg(feature = "encryption")]
779-
column_crypto_metadata: Option<ColumnCryptoMetaData>,
814+
column_crypto_metadata: Option<Box<ColumnCryptoMetaData>>,
780815
#[cfg(feature = "encryption")]
781816
encrypted_column_metadata: Option<Vec<u8>>,
782817
}
@@ -1077,7 +1112,7 @@ impl ColumnChunkMetaData {
10771112
/// Returns the encryption metadata for this column chunk.
10781113
#[cfg(feature = "encryption")]
10791114
pub fn crypto_metadata(&self) -> Option<&ColumnCryptoMetaData> {
1080-
self.column_crypto_metadata.as_ref()
1115+
self.column_crypto_metadata.as_deref()
10811116
}
10821117

10831118
/// Converts this [`ColumnChunkMetaData`] into a [`ColumnChunkMetaDataBuilder`]
@@ -1283,7 +1318,14 @@ impl ColumnChunkMetaDataBuilder {
12831318
#[cfg(feature = "encryption")]
12841319
/// Set the encryption metadata for an encrypted column
12851320
pub fn set_column_crypto_metadata(mut self, value: Option<ColumnCryptoMetaData>) -> Self {
1286-
self.0.column_crypto_metadata = value;
1321+
self.0.column_crypto_metadata = value.map(Box::new);
1322+
self
1323+
}
1324+
1325+
#[cfg(feature = "encryption")]
1326+
/// Set the encryption metadata for an encrypted column
1327+
pub fn set_encrypted_column_metadata(mut self, value: Option<Vec<u8>>) -> Self {
1328+
self.0.encrypted_column_metadata = value;
12871329
self
12881330
}
12891331

@@ -1818,7 +1860,7 @@ mod tests {
18181860
#[cfg(not(feature = "encryption"))]
18191861
let base_expected_size = 2312;
18201862
#[cfg(feature = "encryption")]
1821-
let base_expected_size = 2744;
1863+
let base_expected_size = 2480;
18221864

18231865
assert_eq!(parquet_meta.memory_size(), base_expected_size);
18241866

@@ -1849,7 +1891,7 @@ mod tests {
18491891
#[cfg(not(feature = "encryption"))]
18501892
let bigger_expected_size = 2738;
18511893
#[cfg(feature = "encryption")]
1852-
let bigger_expected_size = 3170;
1894+
let bigger_expected_size = 2906;
18531895

18541896
// more set fields means more memory usage
18551897
assert!(bigger_expected_size > base_expected_size);

parquet/src/file/metadata/parser.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@
2121
//! into the corresponding Rust structures
2222
2323
use crate::errors::ParquetError;
24+
use crate::file::metadata::thrift_gen::parquet_metadata_from_bytes;
2425
use crate::file::metadata::{ColumnChunkMetaData, PageIndexPolicy, ParquetMetaData};
2526

2627
use crate::file::page_index::column_index::ColumnIndexMetaData;
2728
use crate::file::page_index::index_reader::{decode_column_index, decode_offset_index};
2829
use crate::file::page_index::offset_index::OffsetIndexMetaData;
29-
use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
3030
use bytes::Bytes;
3131

3232
/// Helper struct for metadata parsing
@@ -71,11 +71,15 @@ mod inner {
7171
buf: &[u8],
7272
encrypted_footer: bool,
7373
) -> Result<ParquetMetaData> {
74-
crate::file::metadata::thrift_gen::parquet_metadata_with_encryption(
75-
self.file_decryption_properties.as_deref(),
76-
encrypted_footer,
77-
buf,
78-
)
74+
if encrypted_footer || self.file_decryption_properties.is_some() {
75+
crate::file::metadata::thrift_gen::parquet_metadata_with_encryption(
76+
self.file_decryption_properties.as_deref(),
77+
encrypted_footer,
78+
buf,
79+
)
80+
} else {
81+
decode_metadata(buf)
82+
}
7983
}
8084
}
8185

@@ -195,8 +199,7 @@ mod inner {
195199
///
196200
/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
197201
pub(crate) fn decode_metadata(buf: &[u8]) -> crate::errors::Result<ParquetMetaData> {
198-
let mut prot = ThriftSliceInputProtocol::new(buf);
199-
ParquetMetaData::read_thrift(&mut prot)
202+
parquet_metadata_from_bytes(buf)
200203
}
201204

202205
/// Parses column index from the provided bytes and adds it to the metadata.

0 commit comments

Comments
 (0)