Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ParquetMetaDataBuilder #6466

Merged
merged 7 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1556,13 +1556,16 @@ mod tests {
index_reader::read_offset_indexes(&data, metadata.row_group(0).columns())
.expect("reading offset index");

let row_group_meta = metadata.row_group(0).clone();
let metadata = ParquetMetaData::new_with_page_index(
metadata.file_metadata().clone(),
vec![row_group_meta],
None,
Some(vec![offset_index.clone()]),
);
let mut metadata_builder = metadata.into_builder();
let mut row_groups = metadata_builder.take_row_groups();
row_groups.truncate(1);
let row_group_meta = row_groups.pop().unwrap();

let metadata = metadata_builder
.add_row_group(row_group_meta)
.set_column_index(None)
.set_offset_index(Some(vec![offset_index.clone()]))
.build();

let metadata = Arc::new(metadata);

Expand Down
173 changes: 159 additions & 14 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,16 @@ pub type ParquetOffsetIndex = Vec<Vec<OffsetIndexMetaData>>;
/// defined by [`parquet.thrift`].
///
/// # Overview
/// The fields of this structure are:
/// * [`FileMetaData`]: Information about the overall file (such as the schema) (See [`Self::file_metadata`])
/// * [`RowGroupMetaData`]: Information about each Row Group (see [`Self::row_groups`])
/// * [`ParquetColumnIndex`] and [`ParquetOffsetIndex`]: Optional "Page Index" structures (see [`Self::column_index`] and [`Self::offset_index`])
///
/// This structure is read by the various readers in this crate or can be read
/// directly from a file using the [`ParquetMetaDataReader`] struct.
///
/// See the [`ParquetMetaDataBuilder`] to create and modify this structure.
///
/// [`parquet.thrift`]: https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift
#[derive(Debug, Clone, PartialEq)]
pub struct ParquetMetaData {
Expand Down Expand Up @@ -190,18 +193,23 @@ impl ParquetMetaData {

/// Creates Parquet metadata from file metadata, a list of row
/// group metadata, and the column index structures.
#[deprecated(note = "Use ParquetMetaDataBuilder")]
pub fn new_with_page_index(
file_metadata: FileMetaData,
row_groups: Vec<RowGroupMetaData>,
column_index: Option<ParquetColumnIndex>,
offset_index: Option<ParquetOffsetIndex>,
) -> Self {
ParquetMetaData {
file_metadata,
row_groups,
column_index,
offset_index,
}
ParquetMetaDataBuilder::new(file_metadata)
.set_row_groups(row_groups)
.set_column_index(column_index)
.set_offset_index(offset_index)
.build()
}

/// Convert this ParquetMetaData into a [`ParquetMetaDataBuilder`]
pub fn into_builder(self) -> ParquetMetaDataBuilder {
self.into()
}

/// Returns file metadata as reference.
Expand Down Expand Up @@ -290,6 +298,127 @@ impl ParquetMetaData {
}
}

/// A builder for creating / manipulating [`ParquetMetaData`]
///
/// # Example creating a new [`ParquetMetaData`]
///
///```no_run
/// # use parquet::file::metadata::{FileMetaData, ParquetMetaData, ParquetMetaDataBuilder, RowGroupMetaData, RowGroupMetaDataBuilder};
/// # fn get_file_metadata() -> FileMetaData { unimplemented!(); }
/// // Create a new builder given the file metadata
/// let file_metadata = get_file_metadata();
/// // Create a row group
/// let row_group = RowGroupMetaData::builder(file_metadata.schema_descr_ptr())
/// .set_num_rows(100)
/// // ... (A real row group needs more than just the number of rows)
/// .build()
/// .unwrap();
/// // Create the final metadata
/// let metadata: ParquetMetaData = ParquetMetaDataBuilder::new(file_metadata)
/// .add_row_group(row_group)
/// .build();
/// ```
///
/// # Example modifying an existing [`ParquetMetaData`]
/// ```no_run
/// # use parquet::file::metadata::ParquetMetaData;
/// # fn load_metadata() -> ParquetMetaData { unimplemented!(); }
/// // Modify the metadata so only the last RowGroup remains
/// let metadata: ParquetMetaData = load_metadata();
/// let mut builder = metadata.into_builder();
///
/// // Take existing row groups to modify
/// let mut row_groups = builder.take_row_groups();
/// let last_row_group = row_groups.pop().unwrap();
///
/// let metadata = builder
/// .add_row_group(last_row_group)
/// .build();
/// ```
pub struct ParquetMetaDataBuilder(ParquetMetaData);

impl ParquetMetaDataBuilder {
/// Create a new builder from a file metadata, with no row groups
pub fn new(file_meta_data: FileMetaData) -> Self {
Self(ParquetMetaData::new(file_meta_data, vec![]))
}

/// Create a new builder from an existing ParquetMetaData
pub fn new_from_metadata(metadata: ParquetMetaData) -> Self {
Self(metadata)
}

/// Adds a row group to the metadata
pub fn add_row_group(mut self, row_group: RowGroupMetaData) -> Self {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.0.row_groups.push(row_group);
self
}

/// Sets all the row groups to the specified list
pub fn set_row_groups(mut self, row_groups: Vec<RowGroupMetaData>) -> Self {
self.0.row_groups = row_groups;
self
}

/// Takes ownership of the row groups in this builder, and clears the list
/// of row groups.
///
/// This can be used for more efficient creation of a new ParquetMetaData
/// from an existing one.
pub fn take_row_groups(&mut self) -> Vec<RowGroupMetaData> {
std::mem::take(&mut self.0.row_groups)
}

/// Return a reference to the current row groups
pub fn row_groups(&self) -> &[RowGroupMetaData] {
alamb marked this conversation as resolved.
Show resolved Hide resolved
&self.0.row_groups
}

/// Sets the column index
pub fn set_column_index(mut self, column_index: Option<ParquetColumnIndex>) -> Self {
self.0.column_index = column_index;
self
}

/// Returns the current column index from the builder, replacing it with `None`
pub fn take_column_index(&mut self) -> Option<ParquetColumnIndex> {
std::mem::take(&mut self.0.column_index)
}

/// Return a reference to the current column index, if any
pub fn column_index(&self) -> Option<&ParquetColumnIndex> {
self.0.column_index.as_ref()
}

/// Sets the offset index
pub fn set_offset_index(mut self, offset_index: Option<ParquetOffsetIndex>) -> Self {
self.0.offset_index = offset_index;
self
}

/// Returns the current offset index from the builder, replacing it with `None`
pub fn take_offset_index(&mut self) -> Option<ParquetOffsetIndex> {
std::mem::take(&mut self.0.offset_index)
}

/// Return a reference to the current offset index, if any
pub fn offset_index(&self) -> Option<&ParquetOffsetIndex> {
self.0.offset_index.as_ref()
}

/// Creates a new ParquetMetaData from the builder
pub fn build(self) -> ParquetMetaData {
let Self(metadata) = self;
metadata
}
}

impl From<ParquetMetaData> for ParquetMetaDataBuilder {
fn from(meta_data: ParquetMetaData) -> Self {
Self(meta_data)
}
}

pub type KeyValue = crate::format::KeyValue;

/// Reference counted pointer for [`FileMetaData`].
Expand Down Expand Up @@ -566,12 +695,27 @@ impl RowGroupMetaDataBuilder {
self
}

/// Takes ownership of the the column metadata in this builder, and clears
/// the list of columns.
///
/// This can be used for more efficient creation of a new RowGroupMetaData
/// from an existing one.
pub fn take_columns(&mut self) -> Vec<ColumnChunkMetaData> {
std::mem::take(&mut self.0.columns)
}

/// Sets column metadata for this row group.
pub fn set_column_metadata(mut self, value: Vec<ColumnChunkMetaData>) -> Self {
self.0.columns = value;
self
}

/// Adds a column metadata to this row group
pub fn add_column_metadata(mut self, value: ColumnChunkMetaData) -> Self {
self.0.columns.push(value);
self
}

/// Sets ordinal for this row group.
pub fn set_ordinal(mut self, value: i16) -> Self {
self.0.ordinal = Some(value);
Expand Down Expand Up @@ -1672,7 +1816,9 @@ mod tests {
.unwrap();
let row_group_meta_with_stats = vec![row_group_meta_with_stats];

let parquet_meta = ParquetMetaData::new(file_metadata.clone(), row_group_meta_with_stats);
let parquet_meta = ParquetMetaDataBuilder::new(file_metadata.clone())
.set_row_groups(row_group_meta_with_stats)
.build();
let base_expected_size = 2312;

assert_eq!(parquet_meta.memory_size(), base_expected_size);
Expand All @@ -1692,14 +1838,13 @@ mod tests {
offset_index.append_unencoded_byte_array_data_bytes(Some(10));
let offset_index = offset_index.build_to_thrift();

let parquet_meta = ParquetMetaData::new_with_page_index(
file_metadata,
row_group_meta,
Some(vec![vec![Index::BOOLEAN(native_index)]]),
Some(vec![vec![
let parquet_meta = ParquetMetaDataBuilder::new(file_metadata)
.set_row_groups(row_group_meta)
.set_column_index(Some(vec![vec![Index::BOOLEAN(native_index)]]))
.set_offset_index(Some(vec![vec![
OffsetIndexMetaData::try_new(offset_index).unwrap()
]]),
);
]]))
.build();

let bigger_expected_size = 2816;
// more set fields means more memory usage
Expand Down
63 changes: 19 additions & 44 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use crate::bloom_filter::Sbbf;
use crate::column::page::{Page, PageMetadata, PageReader};
use crate::compression::{create_codec, Codec};
use crate::errors::{ParquetError, Result};
use crate::file::page_index::index_reader;
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::{
metadata::*,
Expand Down Expand Up @@ -191,54 +190,30 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
/// Creates file reader from a Parquet file with read options.
/// Returns error if Parquet file does not exist or is corrupt.
pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
let mut metadata_builder = ParquetMetaDataReader::new()
.with_page_indexes(options.enable_page_index)
alamb marked this conversation as resolved.
Show resolved Hide resolved
.parse_and_finish(&chunk_reader)?
.into_builder();

let mut predicates = options.predicates;
let row_groups = metadata.row_groups().to_vec();
let mut filtered_row_groups = Vec::<RowGroupMetaData>::new();
for (i, rg_meta) in row_groups.into_iter().enumerate() {
let mut keep = true;
for predicate in &mut predicates {
if !predicate(&rg_meta, i) {
keep = false;
break;

// Filter row groups based on the predicates
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the cleanup of this code (which is modifying the ParquetMetaData) is the best example of why having this API makes sense -- it makes one fewer copies and also I think is quite a bit clearer

if !predicates.is_empty() {
for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
if predicates
.iter_mut()
.all(|predicate| predicate(&rg_meta, i))
{
metadata_builder = metadata_builder.add_row_group(rg_meta);
}
}
if keep {
filtered_row_groups.push(rg_meta);
}
}

if options.enable_page_index {
let mut columns_indexes = vec![];
let mut offset_indexes = vec![];

for rg in &mut filtered_row_groups {
let column_index = index_reader::read_columns_indexes(&chunk_reader, rg.columns())?;
let offset_index = index_reader::read_offset_indexes(&chunk_reader, rg.columns())?;
columns_indexes.push(column_index);
offset_indexes.push(offset_index);
}

Ok(Self {
chunk_reader: Arc::new(chunk_reader),
metadata: Arc::new(ParquetMetaData::new_with_page_index(
metadata.file_metadata().clone(),
filtered_row_groups,
Some(columns_indexes),
Some(offset_indexes),
)),
props: Arc::new(options.props),
})
} else {
Ok(Self {
chunk_reader: Arc::new(chunk_reader),
metadata: Arc::new(ParquetMetaData::new(
metadata.file_metadata().clone(),
filtered_row_groups,
)),
props: Arc::new(options.props),
})
}
Ok(Self {
chunk_reader: Arc::new(chunk_reader),
metadata: Arc::new(metadata_builder.build()),
props: Arc::new(options.props),
})
}
}

Expand Down
Loading