Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Remove cloning of ColumnChunkMetadata #18615

Merged
merged 2 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ mod serde_types {
use serde_types::*;

/// Metadata for a column chunk.
// This contains the `ColumnDescriptor` associated with the chunk so that deserializers have
// access to the descriptor (e.g. physical, converted, logical).
#[derive(Debug, Clone)]
///
/// This contains the `ColumnDescriptor` associated with the chunk so that deserializers have
/// access to the descriptor (e.g. physical, converted, logical).
///
/// This struct is intentionally not `Clone`, as it is a huge struct.
#[derive(Debug)]
#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))]
pub struct ColumnChunkMetadata {
#[cfg_attr(
Expand Down
21 changes: 1 addition & 20 deletions crates/polars-parquet/src/parquet/metadata/file_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub use crate::parquet::thrift_format::KeyValue;
/// Metadata for a Parquet file.
// This is almost equal to [`parquet_format_safe::FileMetaData`] but contains the descriptors,
// which are crucial to deserialize pages.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct FileMetaData {
/// version of this file.
pub version: i32,
Expand Down Expand Up @@ -87,25 +87,6 @@ impl FileMetaData {
column_orders,
})
}

/// Serializes itself to thrift's [`parquet_format_safe::FileMetaData`].
pub fn into_thrift(self) -> parquet_format_safe::FileMetaData {
parquet_format_safe::FileMetaData {
version: self.version,
schema: self.schema_descr.into_thrift(),
num_rows: self.num_rows as i64,
row_groups: self
.row_groups
.into_iter()
.map(|v| v.into_thrift())
.collect(),
key_value_metadata: self.key_value_metadata,
created_by: self.created_by,
column_orders: None, // todo
encryption_algorithm: None,
footer_signing_key_metadata: None,
}
}
}

/// Parses [`ColumnOrder`] from Thrift definition.
Expand Down
30 changes: 4 additions & 26 deletions crates/polars-parquet/src/parquet/metadata/row_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use hashbrown::hash_map::RawEntryMut;
use parquet_format_safe::RowGroup;
use polars_utils::aliases::{InitHashMaps, PlHashMap};
Expand All @@ -8,7 +10,6 @@ use polars_utils::unitvec;
use super::column_chunk_metadata::{column_metadata_byte_range, ColumnChunkMetadata};
use super::schema_descriptor::SchemaDescriptor;
use crate::parquet::error::{ParquetError, ParquetResult};
use crate::parquet::write::ColumnOffsetsMetadata;

type ColumnLookup = PlHashMap<PlSmallStr, UnitVec<usize>>;

Expand All @@ -34,9 +35,8 @@ impl InitColumnLookup for ColumnLookup {

/// Metadata for a row group.
#[derive(Debug, Clone, Default)]
// #[cfg_attr(feature = "serde_types", derive(serde::Deserialize, serde::Serialize))]
pub struct RowGroupMetaData {
columns: Vec<ColumnChunkMetadata>,
columns: Arc<[ColumnChunkMetadata]>,
column_lookup: PlHashMap<PlSmallStr, UnitVec<usize>>,
num_rows: usize,
total_byte_size: usize,
Expand Down Expand Up @@ -125,7 +125,7 @@ impl RowGroupMetaData {

Ok(column)
})
.collect::<ParquetResult<Vec<_>>>()?;
.collect::<ParquetResult<Arc<[_]>>>()?;

Ok(RowGroupMetaData {
columns,
Expand All @@ -135,26 +135,4 @@ impl RowGroupMetaData {
full_byte_range,
})
}

/// Method to convert to Thrift.
pub(crate) fn into_thrift(self) -> RowGroup {
let file_offset = self
.columns
.iter()
.map(|c| {
ColumnOffsetsMetadata::from_column_chunk_metadata(c).calc_row_group_file_offset()
})
.next()
.unwrap_or(None);
let total_compressed_size = Some(self.compressed_size() as i64);
RowGroup {
columns: self.columns.into_iter().map(|v| v.into_thrift()).collect(),
total_byte_size: self.total_byte_size as i64,
num_rows: self.num_rows as i64,
sorting_columns: None,
file_offset,
total_compressed_size,
ordinal: None,
}
}
}
19 changes: 1 addition & 18 deletions crates/polars-parquet/src/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,7 @@ use polars_utils::mmap::MemReader;
pub use stream::read_metadata as read_metadata_async;

use crate::parquet::error::ParquetResult;
use crate::parquet::metadata::{ColumnChunkMetadata, FileMetaData, RowGroupMetaData};

/// Filters row group metadata to only those row groups,
/// for which the predicate function returns true
pub fn filter_row_groups(
metadata: &FileMetaData,
predicate: &dyn Fn(&RowGroupMetaData, usize) -> bool,
) -> FileMetaData {
let mut filtered_row_groups = Vec::<RowGroupMetaData>::new();
for (i, row_group_metadata) in metadata.row_groups.iter().enumerate() {
if predicate(row_group_metadata, i) {
filtered_row_groups.push(row_group_metadata.clone());
}
}
let mut metadata = metadata.clone();
metadata.row_groups = filtered_row_groups;
metadata
}
use crate::parquet::metadata::ColumnChunkMetadata;

/// Returns a new [`PageReader`] by seeking `reader` to the beginning of `column_chunk`.
pub fn get_page_iterator(
Expand Down