Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1524,7 +1524,7 @@ mod tests {
use crate::arrow::ARROW_SCHEMA_META_KEY;
use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
use crate::column::page::{Page, PageReader};
use crate::file::metadata::thrift_gen::PageHeader;
use crate::file::metadata::thrift::PageHeader;
use crate::file::page_index::column_index::ColumnIndexMetaData;
use crate::file::reader::SerializedPageReader;
use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
Expand Down
6 changes: 3 additions & 3 deletions parquet/src/column/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use bytes::Bytes;

use crate::basic::{Encoding, PageType};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::thrift_gen::{
use crate::file::metadata::thrift::{
DataPageHeader, DataPageHeaderV2, DictionaryPageHeader, PageHeader,
};
use crate::file::statistics::{Statistics, page_stats_to_thrift};
Expand Down Expand Up @@ -345,11 +345,11 @@ pub struct PageMetadata {
pub is_dict: bool,
}

impl TryFrom<&crate::file::metadata::thrift_gen::PageHeader> for PageMetadata {
impl TryFrom<&crate::file::metadata::thrift::PageHeader> for PageMetadata {
type Error = ParquetError;

fn try_from(
value: &crate::file::metadata::thrift_gen::PageHeader,
value: &crate::file::metadata::thrift::PageHeader,
) -> std::result::Result<Self, Self::Error> {
match value.r#type {
PageType::DATA_PAGE => {
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/column/page_encryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::encryption::encrypt::{FileEncryptor, encrypt_thrift_object};
use crate::encryption::modules::{ModuleType, create_module_aad};
use crate::errors::ParquetError;
use crate::errors::Result;
use crate::file::metadata::thrift_gen::PageHeader;
use crate::file::metadata::thrift::PageHeader;
use bytes::Bytes;
use std::io::Write;
use std::sync::Arc;
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/column/page_encryption_disabled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use crate::column::page::CompressedPage;
use crate::errors::Result;
use crate::file::metadata::thrift_gen::PageHeader;
use crate::file::metadata::thrift::PageHeader;
use std::io::Write;

#[derive(Debug)]
Expand Down
15 changes: 9 additions & 6 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,24 +87,22 @@
//!
//! * Same name, different struct
//! ```
#[cfg(feature = "encryption")]
mod encryption;
mod footer_tail;
mod memory;
mod parser;
mod push_decoder;
pub(crate) mod reader;
pub(crate) mod thrift_gen;
pub(crate) mod thrift;
mod writer;

use crate::basic::{EncodingMask, PageType};
#[cfg(feature = "encryption")]
use crate::encryption::decrypt::FileDecryptor;
#[cfg(feature = "encryption")]
use crate::file::column_crypto_metadata::ColumnCryptoMetaData;
#[cfg(feature = "encryption")]
use crate::file::metadata::encryption::EncryptionAlgorithm;
pub(crate) use crate::file::metadata::memory::HeapSize;
#[cfg(feature = "encryption")]
use crate::file::metadata::thrift::encryption::EncryptionAlgorithm;
use crate::file::page_index::column_index::{ByteArrayColumnIndex, PrimitiveColumnIndex};
use crate::file::page_index::{column_index::ColumnIndexMetaData, offset_index::PageLocation};
use crate::file::statistics::Statistics;
Expand Down Expand Up @@ -784,6 +782,11 @@ impl RowGroupMetaDataBuilder {

Ok(self.0)
}

/// Build row group metadata without validation.
pub(super) fn build_unchecked(self) -> RowGroupMetaData {
self.0
}
}

/// Metadata for a column chunk.
Expand Down Expand Up @@ -1600,7 +1603,7 @@ impl OffsetIndexBuilder {
mod tests {
use super::*;
use crate::basic::{PageType, SortOrder};
use crate::file::metadata::thrift_gen::tests::{read_column_chunk, read_row_group};
use crate::file::metadata::thrift::tests::{read_column_chunk, read_row_group};

#[test]
fn test_row_group_metadata_thrift_conversion() {
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/file/metadata/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//! into the corresponding Rust structures

use crate::errors::ParquetError;
use crate::file::metadata::thrift_gen::parquet_metadata_from_bytes;
use crate::file::metadata::thrift::parquet_metadata_from_bytes;
use crate::file::metadata::{ColumnChunkMetaData, PageIndexPolicy, ParquetMetaData};

use crate::file::page_index::column_index::ColumnIndexMetaData;
Expand Down Expand Up @@ -72,7 +72,7 @@ mod inner {
encrypted_footer: bool,
) -> Result<ParquetMetaData> {
if encrypted_footer || self.file_decryption_properties.is_some() {
crate::file::metadata::encryption::parquet_metadata_with_encryption(
crate::file::metadata::thrift::encryption::parquet_metadata_with_encryption(
self.file_decryption_properties.as_deref(),
encrypted_footer,
buf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,23 @@
// specific language governing permissions and limitations
// under the License.

// a collection of generated structs used to parse thrift metadata
//! Encryption support for Thrift serialization

use std::io::Write;

use crate::{
basic::{Compression, EncodingMask},
encryption::decrypt::{FileDecryptionProperties, FileDecryptor},
errors::{ParquetError, Result},
file::{
column_crypto_metadata::ColumnCryptoMetaData,
metadata::{
HeapSize, LevelHistogram, PageEncodingStats, ParquetMetaData, RowGroupMetaData,
thrift_gen::{
GeospatialStatistics, SizeStatistics, Statistics, convert_geo_stats, convert_stats,
parquet_metadata_from_bytes,
},
HeapSize, ParquetMetaData, RowGroupMetaData,
thrift::{parquet_metadata_from_bytes, read_column_metadata, validate_column_metadata},
},
},
parquet_thrift::{
ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift, WriteThriftField,
read_thrift_vec,
},
thrift_struct, thrift_union,
};
Expand Down Expand Up @@ -180,41 +175,9 @@ fn row_group_from_encrypted_thrift(
})?;

// parse decrypted buffer and then replace fields in 'c'
let col_meta = read_column_metadata(decrypted_cc_buf.as_slice())?;

let (
unencoded_byte_array_data_bytes,
repetition_level_histogram,
definition_level_histogram,
) = if let Some(size_stats) = col_meta.size_statistics {
(
size_stats.unencoded_byte_array_data_bytes,
size_stats.repetition_level_histogram,
size_stats.definition_level_histogram,
)
} else {
(None, None, None)
};

let repetition_level_histogram = repetition_level_histogram.map(LevelHistogram::from);
let definition_level_histogram = definition_level_histogram.map(LevelHistogram::from);

c.encodings = col_meta.encodings;
c.compression = col_meta.codec;
c.num_values = col_meta.num_values;
c.total_uncompressed_size = col_meta.total_uncompressed_size;
c.total_compressed_size = col_meta.total_compressed_size;
c.data_page_offset = col_meta.data_page_offset;
c.index_page_offset = col_meta.index_page_offset;
c.dictionary_page_offset = col_meta.dictionary_page_offset;
c.statistics = convert_stats(d.physical_type(), col_meta.statistics)?;
c.encoding_stats = col_meta.encoding_stats;
c.bloom_filter_offset = col_meta.bloom_filter_offset;
c.bloom_filter_length = col_meta.bloom_filter_length;
c.unencoded_byte_array_data_bytes = unencoded_byte_array_data_bytes;
c.repetition_level_histogram = repetition_level_histogram;
c.definition_level_histogram = definition_level_histogram;
c.geo_statistics = convert_geo_stats(col_meta.geospatial_statistics);
let mut prot = ThriftSliceInputProtocol::new(&decrypted_cc_buf);
let mask = read_column_metadata(&mut prot, &mut c)?;
validate_column_metadata(mask)?;

columns.push(c);
} else {
Expand Down Expand Up @@ -373,172 +336,3 @@ fn get_file_decryptor(
)),
}
}

#[derive(Clone, Debug, Eq, PartialEq)]
struct ColumnMetaData<'a> {
encodings: EncodingMask,
codec: Compression,
num_values: i64,
total_uncompressed_size: i64,
total_compressed_size: i64,
data_page_offset: i64,
index_page_offset: Option<i64>,
dictionary_page_offset: Option<i64>,
statistics: Option<Statistics<'a>>,
encoding_stats: Option<Vec<PageEncodingStats>>,
bloom_filter_offset: Option<i64>,
bloom_filter_length: Option<i32>,
size_statistics: Option<SizeStatistics>,
geospatial_statistics: Option<GeospatialStatistics>,
}

fn read_column_metadata<'a>(buf: &'a [u8]) -> Result<ColumnMetaData<'a>> {
let mut prot = ThriftSliceInputProtocol::new(buf);

let mut encodings: Option<EncodingMask> = None;
let mut codec: Option<Compression> = None;
let mut num_values: Option<i64> = None;
let mut total_uncompressed_size: Option<i64> = None;
let mut total_compressed_size: Option<i64> = None;
let mut data_page_offset: Option<i64> = None;
let mut index_page_offset: Option<i64> = None;
let mut dictionary_page_offset: Option<i64> = None;
let mut statistics: Option<Statistics> = None;
let mut encoding_stats: Option<Vec<PageEncodingStats>> = None;
let mut bloom_filter_offset: Option<i64> = None;
let mut bloom_filter_length: Option<i32> = None;
let mut size_statistics: Option<SizeStatistics> = None;
let mut geospatial_statistics: Option<GeospatialStatistics> = None;

// `ColumnMetaData`. Read inline for performance sake.
// struct ColumnMetaData {
// 1: required Type type
// 2: required list<Encoding> encodings
// 3: required list<string> path_in_schema
// 4: required CompressionCodec codec
// 5: required i64 num_values
// 6: required i64 total_uncompressed_size
// 7: required i64 total_compressed_size
// 8: optional list<KeyValue> key_value_metadata
// 9: required i64 data_page_offset
// 10: optional i64 index_page_offset
// 11: optional i64 dictionary_page_offset
// 12: optional Statistics statistics;
// 13: optional list<PageEncodingStats> encoding_stats;
// 14: optional i64 bloom_filter_offset;
// 15: optional i32 bloom_filter_length;
// 16: optional SizeStatistics size_statistics;
// 17: optional GeospatialStatistics geospatial_statistics;
// }
let mut last_field_id = 0i16;
loop {
let field_ident = prot.read_field_begin(last_field_id)?;
if field_ident.field_type == FieldType::Stop {
break;
}
match field_ident.id {
// 1: type is never used, we can use the column descriptor
2 => {
let val = EncodingMask::read_thrift(&mut prot)?;
encodings = Some(val);
}
// 3: path_in_schema is redundant
4 => {
codec = Some(Compression::read_thrift(&mut prot)?);
}
5 => {
num_values = Some(i64::read_thrift(&mut prot)?);
}
6 => {
total_uncompressed_size = Some(i64::read_thrift(&mut prot)?);
}
7 => {
total_compressed_size = Some(i64::read_thrift(&mut prot)?);
}
// 8: we don't expose this key value
9 => {
data_page_offset = Some(i64::read_thrift(&mut prot)?);
}
10 => {
index_page_offset = Some(i64::read_thrift(&mut prot)?);
}
11 => {
dictionary_page_offset = Some(i64::read_thrift(&mut prot)?);
}
12 => {
statistics = Some(Statistics::read_thrift(&mut prot)?);
}
13 => {
let val =
read_thrift_vec::<PageEncodingStats, ThriftSliceInputProtocol>(&mut prot)?;
encoding_stats = Some(val);
}
14 => {
bloom_filter_offset = Some(i64::read_thrift(&mut prot)?);
}
15 => {
bloom_filter_length = Some(i32::read_thrift(&mut prot)?);
}
16 => {
let val = SizeStatistics::read_thrift(&mut prot)?;
size_statistics = Some(val);
}
17 => {
let val = GeospatialStatistics::read_thrift(&mut prot)?;
geospatial_statistics = Some(val);
}
_ => {
prot.skip(field_ident.field_type)?;
}
};
last_field_id = field_ident.id;
}

let Some(encodings) = encodings else {
return Err(ParquetError::General(
"Required field encodings is missing".to_owned(),
));
};
let Some(codec) = codec else {
return Err(ParquetError::General(
"Required field codec is missing".to_owned(),
));
};
let Some(num_values) = num_values else {
return Err(ParquetError::General(
"Required field num_values is missing".to_owned(),
));
};
let Some(total_uncompressed_size) = total_uncompressed_size else {
return Err(ParquetError::General(
"Required field total_uncompressed_size is missing".to_owned(),
));
};
let Some(total_compressed_size) = total_compressed_size else {
return Err(ParquetError::General(
"Required field total_compressed_size is missing".to_owned(),
));
};
let Some(data_page_offset) = data_page_offset else {
return Err(ParquetError::General(
"Required field data_page_offset is missing".to_owned(),
));
};

Ok(ColumnMetaData {
encodings,
num_values,
codec,
total_uncompressed_size,
total_compressed_size,
data_page_offset,
index_page_offset,
dictionary_page_offset,
statistics,
encoding_stats,
bloom_filter_offset,
bloom_filter_length,
size_statistics,
geospatial_statistics,
})
}
Loading
Loading