From 31d68919d8d4e3d98f2121a1ef16454137b7d9c6 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 1 Oct 2024 18:19:28 -0400 Subject: [PATCH] Add `ParquetMetaDataBuilder` (#6466) * Add `ParquetMetadtaBuilder` * Add accessors for ColumnIndex / OffsetIndex * Deprecate ParquetMetaData::new_with_page_index * simplify reading metadata * Revert "simplify reading metadata" This reverts commit c0432e6cb1d0eca54b1214c5b819b8fdbf169662. --- parquet/src/arrow/async_reader/mod.rs | 17 +-- parquet/src/file/metadata/mod.rs | 173 +++++++++++++++++++++++--- parquet/src/file/serialized_reader.rs | 43 +++---- 3 files changed, 186 insertions(+), 47 deletions(-) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 5e8bdbc02eb1..b521ac0102d8 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -1556,13 +1556,16 @@ mod tests { index_reader::read_offset_indexes(&data, metadata.row_group(0).columns()) .expect("reading offset index"); - let row_group_meta = metadata.row_group(0).clone(); - let metadata = ParquetMetaData::new_with_page_index( - metadata.file_metadata().clone(), - vec![row_group_meta], - None, - Some(vec![offset_index.clone()]), - ); + let mut metadata_builder = metadata.into_builder(); + let mut row_groups = metadata_builder.take_row_groups(); + row_groups.truncate(1); + let row_group_meta = row_groups.pop().unwrap(); + + let metadata = metadata_builder + .add_row_group(row_group_meta) + .set_column_index(None) + .set_offset_index(Some(vec![offset_index.clone()])) + .build(); let metadata = Arc::new(metadata); diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 05bc261a6649..30b17b6a2f78 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -156,6 +156,7 @@ pub type ParquetOffsetIndex = Vec>; /// defined by [`parquet.thrift`]. /// /// # Overview +/// The fields of this structure are: /// * [`FileMetaData`]: Information about the overall file (such as the schema) (See [`Self::file_metadata`]) /// * [`RowGroupMetaData`]: Information about each Row Group (see [`Self::row_groups`]) /// * [`ParquetColumnIndex`] and [`ParquetOffsetIndex`]: Optional "Page Index" structures (see [`Self::column_index`] and [`Self::offset_index`]) @@ -163,6 +164,8 @@ pub type ParquetOffsetIndex = Vec>; /// This structure is read by the various readers in this crate or can be read /// directly from a file using the [`ParquetMetaDataReader`] struct. /// +/// See the [`ParquetMetaDataBuilder`] to create and modify this structure. +/// /// [`parquet.thrift`]: https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift #[derive(Debug, Clone, PartialEq)] pub struct ParquetMetaData { @@ -190,18 +193,23 @@ impl ParquetMetaData { /// Creates Parquet metadata from file metadata, a list of row /// group metadata, and the column index structures. + #[deprecated(note = "Use ParquetMetaDataBuilder")] pub fn new_with_page_index( file_metadata: FileMetaData, row_groups: Vec, column_index: Option, offset_index: Option, ) -> Self { - ParquetMetaData { - file_metadata, - row_groups, - column_index, - offset_index, - } + ParquetMetaDataBuilder::new(file_metadata) + .set_row_groups(row_groups) + .set_column_index(column_index) + .set_offset_index(offset_index) + .build() + } + + /// Convert this ParquetMetaData into a [`ParquetMetaDataBuilder`] + pub fn into_builder(self) -> ParquetMetaDataBuilder { + self.into() } /// Returns file metadata as reference. @@ -290,6 +298,127 @@ impl ParquetMetaData { } } +/// A builder for creating / manipulating [`ParquetMetaData`] +/// +/// # Example creating a new [`ParquetMetaData`] +/// +///```no_run +/// # use parquet::file::metadata::{FileMetaData, ParquetMetaData, ParquetMetaDataBuilder, RowGroupMetaData, RowGroupMetaDataBuilder}; +/// # fn get_file_metadata() -> FileMetaData { unimplemented!(); } +/// // Create a new builder given the file metadata +/// let file_metadata = get_file_metadata(); +/// // Create a row group +/// let row_group = RowGroupMetaData::builder(file_metadata.schema_descr_ptr()) +/// .set_num_rows(100) +/// // ... (A real row group needs more than just the number of rows) +/// .build() +/// .unwrap(); +/// // Create the final metadata +/// let metadata: ParquetMetaData = ParquetMetaDataBuilder::new(file_metadata) +/// .add_row_group(row_group) +/// .build(); +/// ``` +/// +/// # Example modifying an existing [`ParquetMetaData`] +/// ```no_run +/// # use parquet::file::metadata::ParquetMetaData; +/// # fn load_metadata() -> ParquetMetaData { unimplemented!(); } +/// // Modify the metadata so only the last RowGroup remains +/// let metadata: ParquetMetaData = load_metadata(); +/// let mut builder = metadata.into_builder(); +/// +/// // Take existing row groups to modify +/// let mut row_groups = builder.take_row_groups(); +/// let last_row_group = row_groups.pop().unwrap(); +/// +/// let metadata = builder +/// .add_row_group(last_row_group) +/// .build(); +/// ``` +pub struct ParquetMetaDataBuilder(ParquetMetaData); + +impl ParquetMetaDataBuilder { + /// Create a new builder from a file metadata, with no row groups + pub fn new(file_meta_data: FileMetaData) -> Self { + Self(ParquetMetaData::new(file_meta_data, vec![])) + } + + /// Create a new builder from an existing ParquetMetaData + pub fn new_from_metadata(metadata: ParquetMetaData) -> Self { + Self(metadata) + } + + /// Adds a row group to the metadata + pub fn add_row_group(mut self, row_group: RowGroupMetaData) -> Self { + self.0.row_groups.push(row_group); + self + } + + /// Sets all the row groups to the specified list + pub fn set_row_groups(mut self, row_groups: Vec) -> Self { + self.0.row_groups = row_groups; + self + } + + /// Takes ownership of the row groups in this builder, and clears the list + /// of row groups. + /// + /// This can be used for more efficient creation of a new ParquetMetaData + /// from an existing one. + pub fn take_row_groups(&mut self) -> Vec { + std::mem::take(&mut self.0.row_groups) + } + + /// Return a reference to the current row groups + pub fn row_groups(&self) -> &[RowGroupMetaData] { + &self.0.row_groups + } + + /// Sets the column index + pub fn set_column_index(mut self, column_index: Option) -> Self { + self.0.column_index = column_index; + self + } + + /// Returns the current column index from the builder, replacing it with `None` + pub fn take_column_index(&mut self) -> Option { + std::mem::take(&mut self.0.column_index) + } + + /// Return a reference to the current column index, if any + pub fn column_index(&self) -> Option<&ParquetColumnIndex> { + self.0.column_index.as_ref() + } + + /// Sets the offset index + pub fn set_offset_index(mut self, offset_index: Option) -> Self { + self.0.offset_index = offset_index; + self + } + + /// Returns the current offset index from the builder, replacing it with `None` + pub fn take_offset_index(&mut self) -> Option { + std::mem::take(&mut self.0.offset_index) + } + + /// Return a reference to the current offset index, if any + pub fn offset_index(&self) -> Option<&ParquetOffsetIndex> { + self.0.offset_index.as_ref() + } + + /// Creates a new ParquetMetaData from the builder + pub fn build(self) -> ParquetMetaData { + let Self(metadata) = self; + metadata + } +} + +impl From for ParquetMetaDataBuilder { + fn from(meta_data: ParquetMetaData) -> Self { + Self(meta_data) + } +} + pub type KeyValue = crate::format::KeyValue; /// Reference counted pointer for [`FileMetaData`]. @@ -566,12 +695,27 @@ impl RowGroupMetaDataBuilder { self } + /// Takes ownership of the the column metadata in this builder, and clears + /// the list of columns. + /// + /// This can be used for more efficient creation of a new RowGroupMetaData + /// from an existing one. + pub fn take_columns(&mut self) -> Vec { + std::mem::take(&mut self.0.columns) + } + /// Sets column metadata for this row group. pub fn set_column_metadata(mut self, value: Vec) -> Self { self.0.columns = value; self } + /// Adds a column metadata to this row group + pub fn add_column_metadata(mut self, value: ColumnChunkMetaData) -> Self { + self.0.columns.push(value); + self + } + /// Sets ordinal for this row group. pub fn set_ordinal(mut self, value: i16) -> Self { self.0.ordinal = Some(value); @@ -1672,7 +1816,9 @@ mod tests { .unwrap(); let row_group_meta_with_stats = vec![row_group_meta_with_stats]; - let parquet_meta = ParquetMetaData::new(file_metadata.clone(), row_group_meta_with_stats); + let parquet_meta = ParquetMetaDataBuilder::new(file_metadata.clone()) + .set_row_groups(row_group_meta_with_stats) + .build(); let base_expected_size = 2312; assert_eq!(parquet_meta.memory_size(), base_expected_size); @@ -1692,14 +1838,13 @@ mod tests { offset_index.append_unencoded_byte_array_data_bytes(Some(10)); let offset_index = offset_index.build_to_thrift(); - let parquet_meta = ParquetMetaData::new_with_page_index( - file_metadata, - row_group_meta, - Some(vec![vec![Index::BOOLEAN(native_index)]]), - Some(vec![vec![ + let parquet_meta = ParquetMetaDataBuilder::new(file_metadata) + .set_row_groups(row_group_meta) + .set_column_index(Some(vec![vec![Index::BOOLEAN(native_index)]])) + .set_offset_index(Some(vec![vec![ OffsetIndexMetaData::try_new(offset_index).unwrap() - ]]), - ); + ]])) + .build(); let bigger_expected_size = 2816; // more set fields means more memory usage diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index b253b73a4fa0..ddb7d0df9052 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -191,11 +191,13 @@ impl SerializedFileReader { /// Creates file reader from a Parquet file with read options. /// Returns error if Parquet file does not exist or is corrupt. pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result { - let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?; + let mut metadata_builder = ParquetMetaDataReader::new() + .parse_and_finish(&chunk_reader)? + .into_builder(); let mut predicates = options.predicates; - let row_groups = metadata.row_groups().to_vec(); - let mut filtered_row_groups = Vec::::new(); - for (i, rg_meta) in row_groups.into_iter().enumerate() { + + // Filter row groups based on the predicates + for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() { let mut keep = true; for predicate in &mut predicates { if !predicate(&rg_meta, i) { @@ -204,7 +206,7 @@ impl SerializedFileReader { } } if keep { - filtered_row_groups.push(rg_meta); + metadata_builder = metadata_builder.add_row_group(rg_meta); } } @@ -212,33 +214,22 @@ impl SerializedFileReader { let mut columns_indexes = vec![]; let mut offset_indexes = vec![]; - for rg in &mut filtered_row_groups { + for rg in metadata_builder.row_groups().iter() { let column_index = index_reader::read_columns_indexes(&chunk_reader, rg.columns())?; let offset_index = index_reader::read_offset_indexes(&chunk_reader, rg.columns())?; columns_indexes.push(column_index); offset_indexes.push(offset_index); } - - Ok(Self { - chunk_reader: Arc::new(chunk_reader), - metadata: Arc::new(ParquetMetaData::new_with_page_index( - metadata.file_metadata().clone(), - filtered_row_groups, - Some(columns_indexes), - Some(offset_indexes), - )), - props: Arc::new(options.props), - }) - } else { - Ok(Self { - chunk_reader: Arc::new(chunk_reader), - metadata: Arc::new(ParquetMetaData::new( - metadata.file_metadata().clone(), - filtered_row_groups, - )), - props: Arc::new(options.props), - }) + metadata_builder = metadata_builder + .set_column_index(Some(columns_indexes)) + .set_offset_index(Some(offset_indexes)); } + + Ok(Self { + chunk_reader: Arc::new(chunk_reader), + metadata: Arc::new(metadata_builder.build()), + props: Arc::new(options.props), + }) } }