Skip to content

Commit

Permalink
Add ParquetMetaDataBuilder (#6466)
Browse files Browse the repository at this point in the history
* Add `ParquetMetadtaBuilder`

* Add accessors for ColumnIndex / OffsetIndex

* Deprecate ParquetMetaData::new_with_page_index

* simplify reading metadata

* Revert "simplify reading metadata"

This reverts commit c0432e6.
  • Loading branch information
alamb authored Oct 1, 2024
1 parent bea9daa commit 31d6891
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 47 deletions.
17 changes: 10 additions & 7 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1556,13 +1556,16 @@ mod tests {
index_reader::read_offset_indexes(&data, metadata.row_group(0).columns())
.expect("reading offset index");

let row_group_meta = metadata.row_group(0).clone();
let metadata = ParquetMetaData::new_with_page_index(
metadata.file_metadata().clone(),
vec![row_group_meta],
None,
Some(vec![offset_index.clone()]),
);
let mut metadata_builder = metadata.into_builder();
let mut row_groups = metadata_builder.take_row_groups();
row_groups.truncate(1);
let row_group_meta = row_groups.pop().unwrap();

let metadata = metadata_builder
.add_row_group(row_group_meta)
.set_column_index(None)
.set_offset_index(Some(vec![offset_index.clone()]))
.build();

let metadata = Arc::new(metadata);

Expand Down
173 changes: 159 additions & 14 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,16 @@ pub type ParquetOffsetIndex = Vec<Vec<OffsetIndexMetaData>>;
/// defined by [`parquet.thrift`].
///
/// # Overview
/// The fields of this structure are:
/// * [`FileMetaData`]: Information about the overall file (such as the schema) (See [`Self::file_metadata`])
/// * [`RowGroupMetaData`]: Information about each Row Group (see [`Self::row_groups`])
/// * [`ParquetColumnIndex`] and [`ParquetOffsetIndex`]: Optional "Page Index" structures (see [`Self::column_index`] and [`Self::offset_index`])
///
/// This structure is read by the various readers in this crate or can be read
/// directly from a file using the [`ParquetMetaDataReader`] struct.
///
/// See the [`ParquetMetaDataBuilder`] to create and modify this structure.
///
/// [`parquet.thrift`]: https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift
#[derive(Debug, Clone, PartialEq)]
pub struct ParquetMetaData {
Expand Down Expand Up @@ -190,18 +193,23 @@ impl ParquetMetaData {

/// Creates Parquet metadata from file metadata, a list of row
/// group metadata, and the column index structures.
#[deprecated(note = "Use ParquetMetaDataBuilder")]
pub fn new_with_page_index(
file_metadata: FileMetaData,
row_groups: Vec<RowGroupMetaData>,
column_index: Option<ParquetColumnIndex>,
offset_index: Option<ParquetOffsetIndex>,
) -> Self {
ParquetMetaData {
file_metadata,
row_groups,
column_index,
offset_index,
}
ParquetMetaDataBuilder::new(file_metadata)
.set_row_groups(row_groups)
.set_column_index(column_index)
.set_offset_index(offset_index)
.build()
}

/// Convert this ParquetMetaData into a [`ParquetMetaDataBuilder`]
pub fn into_builder(self) -> ParquetMetaDataBuilder {
self.into()
}

/// Returns file metadata as reference.
Expand Down Expand Up @@ -290,6 +298,127 @@ impl ParquetMetaData {
}
}

/// A builder for creating / manipulating [`ParquetMetaData`]
///
/// # Example creating a new [`ParquetMetaData`]
///
///```no_run
/// # use parquet::file::metadata::{FileMetaData, ParquetMetaData, ParquetMetaDataBuilder, RowGroupMetaData, RowGroupMetaDataBuilder};
/// # fn get_file_metadata() -> FileMetaData { unimplemented!(); }
/// // Create a new builder given the file metadata
/// let file_metadata = get_file_metadata();
/// // Create a row group
/// let row_group = RowGroupMetaData::builder(file_metadata.schema_descr_ptr())
/// .set_num_rows(100)
/// // ... (A real row group needs more than just the number of rows)
/// .build()
/// .unwrap();
/// // Create the final metadata
/// let metadata: ParquetMetaData = ParquetMetaDataBuilder::new(file_metadata)
/// .add_row_group(row_group)
/// .build();
/// ```
///
/// # Example modifying an existing [`ParquetMetaData`]
/// ```no_run
/// # use parquet::file::metadata::ParquetMetaData;
/// # fn load_metadata() -> ParquetMetaData { unimplemented!(); }
/// // Modify the metadata so only the last RowGroup remains
/// let metadata: ParquetMetaData = load_metadata();
/// let mut builder = metadata.into_builder();
///
/// // Take existing row groups to modify
/// let mut row_groups = builder.take_row_groups();
/// let last_row_group = row_groups.pop().unwrap();
///
/// let metadata = builder
/// .add_row_group(last_row_group)
/// .build();
/// ```
pub struct ParquetMetaDataBuilder(ParquetMetaData);

impl ParquetMetaDataBuilder {
/// Create a new builder from a file metadata, with no row groups
pub fn new(file_meta_data: FileMetaData) -> Self {
Self(ParquetMetaData::new(file_meta_data, vec![]))
}

/// Create a new builder from an existing ParquetMetaData
pub fn new_from_metadata(metadata: ParquetMetaData) -> Self {
Self(metadata)
}

/// Adds a row group to the metadata
pub fn add_row_group(mut self, row_group: RowGroupMetaData) -> Self {
self.0.row_groups.push(row_group);
self
}

/// Sets all the row groups to the specified list
pub fn set_row_groups(mut self, row_groups: Vec<RowGroupMetaData>) -> Self {
self.0.row_groups = row_groups;
self
}

/// Takes ownership of the row groups in this builder, and clears the list
/// of row groups.
///
/// This can be used for more efficient creation of a new ParquetMetaData
/// from an existing one.
pub fn take_row_groups(&mut self) -> Vec<RowGroupMetaData> {
std::mem::take(&mut self.0.row_groups)
}

/// Return a reference to the current row groups
pub fn row_groups(&self) -> &[RowGroupMetaData] {
&self.0.row_groups
}

/// Sets the column index
pub fn set_column_index(mut self, column_index: Option<ParquetColumnIndex>) -> Self {
self.0.column_index = column_index;
self
}

/// Returns the current column index from the builder, replacing it with `None`
pub fn take_column_index(&mut self) -> Option<ParquetColumnIndex> {
std::mem::take(&mut self.0.column_index)
}

/// Return a reference to the current column index, if any
pub fn column_index(&self) -> Option<&ParquetColumnIndex> {
self.0.column_index.as_ref()
}

/// Sets the offset index
pub fn set_offset_index(mut self, offset_index: Option<ParquetOffsetIndex>) -> Self {
self.0.offset_index = offset_index;
self
}

/// Returns the current offset index from the builder, replacing it with `None`
pub fn take_offset_index(&mut self) -> Option<ParquetOffsetIndex> {
std::mem::take(&mut self.0.offset_index)
}

/// Return a reference to the current offset index, if any
pub fn offset_index(&self) -> Option<&ParquetOffsetIndex> {
self.0.offset_index.as_ref()
}

/// Creates a new ParquetMetaData from the builder
pub fn build(self) -> ParquetMetaData {
let Self(metadata) = self;
metadata
}
}

impl From<ParquetMetaData> for ParquetMetaDataBuilder {
fn from(meta_data: ParquetMetaData) -> Self {
Self(meta_data)
}
}

pub type KeyValue = crate::format::KeyValue;

/// Reference counted pointer for [`FileMetaData`].
Expand Down Expand Up @@ -566,12 +695,27 @@ impl RowGroupMetaDataBuilder {
self
}

/// Takes ownership of the the column metadata in this builder, and clears
/// the list of columns.
///
/// This can be used for more efficient creation of a new RowGroupMetaData
/// from an existing one.
pub fn take_columns(&mut self) -> Vec<ColumnChunkMetaData> {
std::mem::take(&mut self.0.columns)
}

/// Sets column metadata for this row group.
pub fn set_column_metadata(mut self, value: Vec<ColumnChunkMetaData>) -> Self {
self.0.columns = value;
self
}

/// Adds a column metadata to this row group
pub fn add_column_metadata(mut self, value: ColumnChunkMetaData) -> Self {
self.0.columns.push(value);
self
}

/// Sets ordinal for this row group.
pub fn set_ordinal(mut self, value: i16) -> Self {
self.0.ordinal = Some(value);
Expand Down Expand Up @@ -1672,7 +1816,9 @@ mod tests {
.unwrap();
let row_group_meta_with_stats = vec![row_group_meta_with_stats];

let parquet_meta = ParquetMetaData::new(file_metadata.clone(), row_group_meta_with_stats);
let parquet_meta = ParquetMetaDataBuilder::new(file_metadata.clone())
.set_row_groups(row_group_meta_with_stats)
.build();
let base_expected_size = 2312;

assert_eq!(parquet_meta.memory_size(), base_expected_size);
Expand All @@ -1692,14 +1838,13 @@ mod tests {
offset_index.append_unencoded_byte_array_data_bytes(Some(10));
let offset_index = offset_index.build_to_thrift();

let parquet_meta = ParquetMetaData::new_with_page_index(
file_metadata,
row_group_meta,
Some(vec![vec![Index::BOOLEAN(native_index)]]),
Some(vec![vec![
let parquet_meta = ParquetMetaDataBuilder::new(file_metadata)
.set_row_groups(row_group_meta)
.set_column_index(Some(vec![vec![Index::BOOLEAN(native_index)]]))
.set_offset_index(Some(vec![vec![
OffsetIndexMetaData::try_new(offset_index).unwrap()
]]),
);
]]))
.build();

let bigger_expected_size = 2816;
// more set fields means more memory usage
Expand Down
43 changes: 17 additions & 26 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,13 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
/// Creates file reader from a Parquet file with read options.
/// Returns error if Parquet file does not exist or is corrupt.
pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
let mut metadata_builder = ParquetMetaDataReader::new()
.parse_and_finish(&chunk_reader)?
.into_builder();
let mut predicates = options.predicates;
let row_groups = metadata.row_groups().to_vec();
let mut filtered_row_groups = Vec::<RowGroupMetaData>::new();
for (i, rg_meta) in row_groups.into_iter().enumerate() {

// Filter row groups based on the predicates
for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
let mut keep = true;
for predicate in &mut predicates {
if !predicate(&rg_meta, i) {
Expand All @@ -204,41 +206,30 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
}
}
if keep {
filtered_row_groups.push(rg_meta);
metadata_builder = metadata_builder.add_row_group(rg_meta);
}
}

if options.enable_page_index {
let mut columns_indexes = vec![];
let mut offset_indexes = vec![];

for rg in &mut filtered_row_groups {
for rg in metadata_builder.row_groups().iter() {
let column_index = index_reader::read_columns_indexes(&chunk_reader, rg.columns())?;
let offset_index = index_reader::read_offset_indexes(&chunk_reader, rg.columns())?;
columns_indexes.push(column_index);
offset_indexes.push(offset_index);
}

Ok(Self {
chunk_reader: Arc::new(chunk_reader),
metadata: Arc::new(ParquetMetaData::new_with_page_index(
metadata.file_metadata().clone(),
filtered_row_groups,
Some(columns_indexes),
Some(offset_indexes),
)),
props: Arc::new(options.props),
})
} else {
Ok(Self {
chunk_reader: Arc::new(chunk_reader),
metadata: Arc::new(ParquetMetaData::new(
metadata.file_metadata().clone(),
filtered_row_groups,
)),
props: Arc::new(options.props),
})
metadata_builder = metadata_builder
.set_column_index(Some(columns_indexes))
.set_offset_index(Some(offset_indexes));
}

Ok(Self {
chunk_reader: Arc::new(chunk_reader),
metadata: Arc::new(metadata_builder.build()),
props: Arc::new(options.props),
})
}
}

Expand Down

0 comments on commit 31d6891

Please sign in to comment.