Skip to content

Commit a0db198

Browse files
authored
Add options to control various aspects of Parquet metadata decoding (#8763)
1 parent a14f77c commit a0db198

File tree

12 files changed

+342
-33
lines changed

12 files changed

+342
-33
lines changed

parquet/benches/metadata.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ use std::sync::Arc;
2020

2121
use parquet::basic::{Encoding, PageType, Type as PhysicalType};
2222
use parquet::file::metadata::{
23-
ColumnChunkMetaData, FileMetaData, PageEncodingStats, ParquetMetaData, ParquetMetaDataReader,
24-
ParquetMetaDataWriter, RowGroupMetaData,
23+
ColumnChunkMetaData, FileMetaData, PageEncodingStats, ParquetMetaData, ParquetMetaDataOptions,
24+
ParquetMetaDataReader, ParquetMetaDataWriter, RowGroupMetaData,
2525
};
2626
use parquet::file::statistics::Statistics;
2727
use parquet::file::writer::TrackedWrite;
@@ -164,12 +164,29 @@ fn criterion_benchmark(c: &mut Criterion) {
164164
})
165165
});
166166

167+
let schema = ParquetMetaDataReader::decode_schema(&meta_data).unwrap();
168+
let options = ParquetMetaDataOptions::new().with_schema(schema);
169+
c.bench_function("decode metadata with schema", |b| {
170+
b.iter(|| {
171+
ParquetMetaDataReader::decode_metadata_with_options(&meta_data, Some(&options))
172+
.unwrap();
173+
})
174+
});
175+
167176
let buf: Bytes = black_box(encoded_meta()).into();
168177
c.bench_function("decode parquet metadata (wide)", |b| {
169178
b.iter(|| {
170179
ParquetMetaDataReader::decode_metadata(&buf).unwrap();
171180
})
172181
});
182+
183+
let schema = ParquetMetaDataReader::decode_schema(&buf).unwrap();
184+
let options = ParquetMetaDataOptions::new().with_schema(schema);
185+
c.bench_function("decode metadata (wide) with schema", |b| {
186+
b.iter(|| {
187+
ParquetMetaDataReader::decode_metadata_with_options(&buf, Some(&options)).unwrap();
188+
})
189+
});
173190
}
174191

175192
criterion_group!(benches, criterion_benchmark);

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ use crate::column::page::{PageIterator, PageReader};
3838
#[cfg(feature = "encryption")]
3939
use crate::encryption::decrypt::FileDecryptionProperties;
4040
use crate::errors::{ParquetError, Result};
41-
use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
41+
use crate::file::metadata::{
42+
PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
43+
};
4244
use crate::file::reader::{ChunkReader, SerializedPageReader};
4345
use crate::schema::types::SchemaDescriptor;
4446

@@ -404,6 +406,8 @@ pub struct ArrowReaderOptions {
404406
supplied_schema: Option<SchemaRef>,
405407
/// Policy for reading offset and column indexes.
406408
pub(crate) page_index_policy: PageIndexPolicy,
409+
/// Options to control reading of Parquet metadata
410+
metadata_options: ParquetMetaDataOptions,
407411
/// If encryption is enabled, the file decryption properties can be provided
408412
#[cfg(feature = "encryption")]
409413
pub(crate) file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
@@ -523,6 +527,16 @@ impl ArrowReaderOptions {
523527
}
524528
}
525529

530+
/// Provide a Parquet schema to use when decoding the metadata. The schema in the Parquet
531+
/// footer will be skipped.
532+
///
533+
/// This can be used to avoid reparsing the schema from the file when it is
534+
/// already known.
535+
pub fn with_parquet_schema(mut self, schema: Arc<SchemaDescriptor>) -> Self {
536+
self.metadata_options.set_schema(schema);
537+
self
538+
}
539+
526540
/// Provide the file decryption properties to use when reading encrypted parquet files.
527541
///
528542
/// If encryption is enabled and the file is encrypted, the `file_decryption_properties` must be provided.
@@ -544,6 +558,11 @@ impl ArrowReaderOptions {
544558
self.page_index_policy != PageIndexPolicy::Skip
545559
}
546560

561+
/// Retrieve the currently set metadata decoding options.
562+
pub fn metadata_options(&self) -> &ParquetMetaDataOptions {
563+
&self.metadata_options
564+
}
565+
547566
/// Retrieve the currently set file decryption properties.
548567
///
549568
/// This can be set via
@@ -591,8 +610,9 @@ impl ArrowReaderMetadata {
591610
/// `Self::metadata` is missing the page index, this function will attempt
592611
/// to load the page index by making an object store request.
593612
pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
594-
let metadata =
595-
ParquetMetaDataReader::new().with_page_index_policy(options.page_index_policy);
613+
let metadata = ParquetMetaDataReader::new()
614+
.with_page_index_policy(options.page_index_policy)
615+
.with_metadata_options(Some(options.metadata_options.clone()));
596616
#[cfg(feature = "encryption")]
597617
let metadata = metadata.with_decryption_properties(
598618
options.file_decryption_properties.as_ref().map(Arc::clone),
@@ -1246,6 +1266,22 @@ mod tests {
12461266
assert_eq!(original_schema.fields(), reader.schema().fields());
12471267
}
12481268

1269+
#[test]
1270+
fn test_reuse_schema() {
1271+
let file = get_test_file("parquet/alltypes-java.parquet");
1272+
1273+
let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1274+
let expected = builder.metadata;
1275+
let schema = expected.file_metadata().schema_descr_ptr();
1276+
1277+
let arrow_options = ArrowReaderOptions::new().with_parquet_schema(schema.clone());
1278+
let builder =
1279+
ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1280+
1281+
// Verify that the metadata matches
1282+
assert_eq!(expected.as_ref(), builder.metadata.as_ref());
1283+
}
1284+
12491285
#[test]
12501286
fn test_arrow_reader_single_column() {
12511287
let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");

parquet/src/arrow/async_reader/mod.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,9 +164,12 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
164164
options: Option<&'a ArrowReaderOptions>,
165165
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
166166
async move {
167-
let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy(
168-
PageIndexPolicy::from(options.is_some_and(|o| o.page_index())),
169-
);
167+
let metadata_opts = options.map(|o| o.metadata_options().clone());
168+
let metadata_reader = ParquetMetaDataReader::new()
169+
.with_page_index_policy(PageIndexPolicy::from(
170+
options.is_some_and(|o| o.page_index()),
171+
))
172+
.with_metadata_options(metadata_opts);
170173

171174
#[cfg(feature = "encryption")]
172175
let metadata_reader = metadata_reader.with_decryption_properties(

parquet/src/arrow/async_reader/store.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,9 @@ impl AsyncFileReader for ParquetObjectReader {
199199
options: Option<&'a ArrowReaderOptions>,
200200
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
201201
Box::pin(async move {
202+
let metadata_opts = options.map(|o| o.metadata_options().clone());
202203
let mut metadata = ParquetMetaDataReader::new()
204+
.with_metadata_options(metadata_opts)
203205
.with_column_index_policy(PageIndexPolicy::from(self.preload_column_index))
204206
.with_offset_index_policy(PageIndexPolicy::from(self.preload_offset_index))
205207
.with_prefetch_hint(self.metadata_size_hint);

parquet/src/file/metadata/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
//! ```
8989
mod footer_tail;
9090
mod memory;
91+
mod options;
9192
mod parser;
9293
mod push_decoder;
9394
pub(crate) mod reader;
@@ -127,6 +128,7 @@ use crate::{
127128
};
128129

129130
pub use footer_tail::FooterTail;
131+
pub use options::ParquetMetaDataOptions;
130132
pub use push_decoder::ParquetMetaDataPushDecoder;
131133
pub use reader::{PageIndexPolicy, ParquetMetaDataReader};
132134
use std::io::Write;
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Options used to control metadata parsing
19+
20+
use crate::schema::types::SchemaDescPtr;
21+
22+
/// Options that can be set to control what parts of the Parquet file footer
23+
/// metadata will be decoded and made present in the [`ParquetMetaData`] returned
24+
/// by [`ParquetMetaDataReader`] and [`ParquetMetaDataPushDecoder`].
25+
///
26+
/// [`ParquetMetaData`]: crate::file::metadata::ParquetMetaData
27+
/// [`ParquetMetaDataReader`]: crate::file::metadata::ParquetMetaDataReader
28+
/// [`ParquetMetaDataPushDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder
29+
#[derive(Default, Debug, Clone)]
30+
pub struct ParquetMetaDataOptions {
31+
schema_descr: Option<SchemaDescPtr>,
32+
}
33+
34+
impl ParquetMetaDataOptions {
35+
/// Return a new default [`ParquetMetaDataOptions`].
36+
pub fn new() -> Self {
37+
Default::default()
38+
}
39+
40+
/// Returns an optional [`SchemaDescPtr`] to use when decoding. If this is not `None` then
41+
/// the schema in the footer will be skipped.
42+
pub fn schema(&self) -> Option<&SchemaDescPtr> {
43+
self.schema_descr.as_ref()
44+
}
45+
46+
/// Provide a schema to use when decoding the metadata.
47+
pub fn set_schema(&mut self, val: SchemaDescPtr) {
48+
self.schema_descr = Some(val);
49+
}
50+
51+
/// Provide a schema to use when decoding the metadata. Returns `Self` for chaining.
52+
pub fn with_schema(mut self, val: SchemaDescPtr) -> Self {
53+
self.schema_descr = Some(val);
54+
self
55+
}
56+
}
57+
58+
#[cfg(test)]
59+
mod tests {
60+
use bytes::Bytes;
61+
62+
use crate::{
63+
DecodeResult,
64+
file::metadata::{ParquetMetaDataOptions, ParquetMetaDataPushDecoder},
65+
util::test_common::file_util::get_test_file,
66+
};
67+
use std::{io::Read, sync::Arc};
68+
69+
#[test]
70+
fn test_provide_schema() {
71+
let mut buf: Vec<u8> = Vec::new();
72+
get_test_file("alltypes_plain.parquet")
73+
.read_to_end(&mut buf)
74+
.unwrap();
75+
76+
let data = Bytes::from(buf);
77+
let mut decoder = ParquetMetaDataPushDecoder::try_new(data.len() as u64).unwrap();
78+
decoder
79+
.push_range(0..data.len() as u64, data.clone())
80+
.unwrap();
81+
82+
let expected = match decoder.try_decode().unwrap() {
83+
DecodeResult::Data(m) => m,
84+
_ => panic!("could not parse metadata"),
85+
};
86+
let expected_schema = expected.file_metadata().schema_descr_ptr();
87+
88+
let mut options = ParquetMetaDataOptions::new();
89+
options.set_schema(expected_schema);
90+
let options = Arc::new(options);
91+
92+
let mut decoder = ParquetMetaDataPushDecoder::try_new(data.len() as u64)
93+
.unwrap()
94+
.with_metadata_options(Some(options));
95+
decoder.push_range(0..data.len() as u64, data).unwrap();
96+
let metadata = match decoder.try_decode().unwrap() {
97+
DecodeResult::Data(m) => m,
98+
_ => panic!("could not parse metadata"),
99+
};
100+
101+
assert_eq!(expected, metadata);
102+
// the schema pointers should be the same
103+
assert!(Arc::ptr_eq(
104+
&expected.file_metadata().schema_descr_ptr(),
105+
&metadata.file_metadata().schema_descr_ptr()
106+
));
107+
}
108+
}

parquet/src/file/metadata/parser.rs

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
2323
use crate::errors::ParquetError;
2424
use crate::file::metadata::thrift::parquet_metadata_from_bytes;
25-
use crate::file::metadata::{ColumnChunkMetaData, PageIndexPolicy, ParquetMetaData};
25+
use crate::file::metadata::{
26+
ColumnChunkMetaData, PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions,
27+
};
2628

2729
use crate::file::page_index::column_index::ColumnIndexMetaData;
2830
use crate::file::page_index::index_reader::{decode_column_index, decode_offset_index};
@@ -51,6 +53,8 @@ mod inner {
5153
pub(crate) struct MetadataParser {
5254
// the credentials and keys needed to decrypt metadata
5355
file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
56+
// metadata parsing options
57+
metadata_options: Option<Arc<ParquetMetaDataOptions>>,
5458
}
5559

5660
impl MetadataParser {
@@ -66,6 +70,16 @@ mod inner {
6670
self
6771
}
6872

73+
pub(crate) fn with_metadata_options(
74+
self,
75+
options: Option<Arc<ParquetMetaDataOptions>>,
76+
) -> Self {
77+
Self {
78+
metadata_options: options,
79+
..self
80+
}
81+
}
82+
6983
pub(crate) fn decode_metadata(
7084
&self,
7185
buf: &[u8],
@@ -76,9 +90,10 @@ mod inner {
7690
self.file_decryption_properties.as_ref(),
7791
encrypted_footer,
7892
buf,
93+
self.metadata_options.as_deref(),
7994
)
8095
} else {
81-
decode_metadata(buf)
96+
decode_metadata(buf, self.metadata_options.as_deref())
8297
}
8398
}
8499
}
@@ -144,15 +159,28 @@ mod inner {
144159
mod inner {
145160
use super::*;
146161
use crate::errors::Result;
162+
use std::sync::Arc;
147163
/// parallel implementation when encryption feature is not enabled
148164
///
149165
/// This has the same API as the encryption-enabled version
150166
#[derive(Debug, Default)]
151-
pub(crate) struct MetadataParser;
167+
pub(crate) struct MetadataParser {
168+
// metadata parsing options
169+
metadata_options: Option<Arc<ParquetMetaDataOptions>>,
170+
}
152171

153172
impl MetadataParser {
154173
pub(crate) fn new() -> Self {
155-
MetadataParser
174+
MetadataParser::default()
175+
}
176+
177+
pub(crate) fn with_metadata_options(
178+
self,
179+
options: Option<Arc<ParquetMetaDataOptions>>,
180+
) -> Self {
181+
Self {
182+
metadata_options: options,
183+
}
156184
}
157185

158186
pub(crate) fn decode_metadata(
@@ -165,7 +193,7 @@ mod inner {
165193
"Parquet file has an encrypted footer but the encryption feature is disabled"
166194
))
167195
} else {
168-
decode_metadata(buf)
196+
decode_metadata(buf, self.metadata_options.as_deref())
169197
}
170198
}
171199
}
@@ -198,8 +226,11 @@ mod inner {
198226
/// by the [Parquet Spec].
199227
///
200228
/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
201-
pub(crate) fn decode_metadata(buf: &[u8]) -> crate::errors::Result<ParquetMetaData> {
202-
parquet_metadata_from_bytes(buf)
229+
pub(crate) fn decode_metadata(
230+
buf: &[u8],
231+
options: Option<&ParquetMetaDataOptions>,
232+
) -> crate::errors::Result<ParquetMetaData> {
233+
parquet_metadata_from_bytes(buf, options)
203234
}
204235

205236
/// Parses column index from the provided bytes and adds it to the metadata.

0 commit comments

Comments
 (0)