Skip to content

Commit

Permalink
Handle partitions in polars-io
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Aug 23, 2024
1 parent fbee7fb commit 7053a6c
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 78 deletions.
1 change: 1 addition & 0 deletions crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ fast-float = { workspace = true, optional = true }
flate2 = { workspace = true, optional = true }
futures = { workspace = true, optional = true }
glob = { version = "0.3" }
hashbrown = { workspace = true }
itoa = { workspace = true, optional = true }
memchr = { workspace = true }
memmap = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions crates/polars-io/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
#![cfg_attr(feature = "simd", feature(portable_simd))]
#![allow(ambiguous_glob_reexports)]
extern crate core;

#[cfg(feature = "avro")]
pub mod avro;
Expand Down
52 changes: 52 additions & 0 deletions crates/polars-io/src/parquet/read/metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use hashbrown::hash_map::RawEntryMut;
use polars_parquet::read::{ColumnChunkMetaData, RowGroupMetaData};
use polars_utils::aliases::{PlHashMap, PlHashSet};
use polars_utils::idx_vec::UnitVec;
use polars_utils::unitvec;

pub(super) struct ColumnToColumnChunkMD<'a> {
partitions: PlHashMap<String, UnitVec<usize>>,
metadata: &'a RowGroupMetaData,
}

impl<'a> ColumnToColumnChunkMD<'a> {
pub(super) fn new(metadata: &'a RowGroupMetaData) -> Self {
Self {
partitions: Default::default(),
metadata,
}
}

pub(super) fn set_partitions(&mut self, field_names: Option<&PlHashSet<&str>>) {
for (i, ccmd) in self.metadata.columns().iter().enumerate() {
let name = &ccmd.descriptor().path_in_schema[0];
if field_names
.map(|field_names| field_names.contains(name.as_str()))
.unwrap_or(true)
{
let entry = self.partitions.raw_entry_mut().from_key(name.as_str());

match entry {
RawEntryMut::Vacant(slot) => {
slot.insert(name.to_string(), unitvec![i]);
},
RawEntryMut::Occupied(mut slot) => {
slot.get_mut().push(i);
},
};
}
}
}

pub(super) fn get_partitions(&self, name: &str) -> UnitVec<&ColumnChunkMetaData> {
debug_assert!(
!self.partitions.is_empty(),
"fields should be partitioned first"
);
let columns = self.metadata.columns();
self.partitions
.get(name)
.map(|idx| idx.iter().map(|i| &columns[*i]).collect::<UnitVec<_>>())
.unwrap_or_default()
}
}
1 change: 1 addition & 0 deletions crates/polars-io/src/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#[cfg(feature = "cloud")]
mod async_impl;
mod metadata;
mod mmap;
mod options;
mod predicates;
Expand Down
79 changes: 50 additions & 29 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use polars_core::utils::{accumulate_dataframes_vertical, split_df};
use polars_core::POOL;
use polars_parquet::parquet::error::ParquetResult;
use polars_parquet::parquet::statistics::Statistics;
use polars_parquet::read::{self, FileMetaData, Filter, PhysicalType, RowGroupMetaData};
use polars_parquet::read::{
self, ColumnChunkMetaData, FileMetaData, Filter, PhysicalType, RowGroupMetaData,
};
use polars_utils::mmap::MemSlice;
use polars_utils::vec::inplace_zip_filtermap;
use rayon::prelude::*;
Expand All @@ -24,6 +26,7 @@ use super::{mmap, ParallelStrategy};
use crate::hive::materialize_hive_partitions;
use crate::mmap::{MmapBytesReader, ReaderBytes};
use crate::parquet::metadata::FileMetaDataRef;
use crate::parquet::read::metadata::ColumnToColumnChunkMD;
use crate::parquet::read::ROW_COUNT_OVERFLOW_ERR;
use crate::predicates::{apply_predicate, PhysicalIoExpr};
use crate::utils::get_reader_bytes;
Expand Down Expand Up @@ -58,7 +61,8 @@ fn assert_dtypes(data_type: &ArrowDataType) {

fn column_idx_to_series(
column_i: usize,
md: &RowGroupMetaData,
// The metadata beloning to this column

Check warning on line 64 in crates/polars-io/src/parquet/read/read_impl.rs

View workflow job for this annotation

GitHub Actions / main

"beloning" should be "belonging".
field_md: &[&ColumnChunkMetaData],
filter: Option<Filter>,
file_schema: &ArrowSchema,
store: &mmap::ColumnStore,
Expand All @@ -69,9 +73,7 @@ fn column_idx_to_series(
{
assert_dtypes(field.data_type())
}
let parts = md.get_partition_fields(&field.name);

let columns = mmap_columns(store, parts.as_slice());
let columns = mmap_columns(store, field_md);
let stats = columns
.iter()
.map(|(col_md, _)| col_md.statistics().transpose())
Expand Down Expand Up @@ -299,25 +301,29 @@ fn rg_to_dfs_prefiltered(
POOL.install(|| {
// Set partitioned fields to prevent quadratic behavior.
// Ensure all row groups are partitioned.
{
let part_md = {
let projected_columns = projected_columns_set(schema, projection);

(row_group_start..row_group_end)
.into_par_iter()
.for_each(|rg_idx| {
file_metadata.row_groups[rg_idx]
.set_partition_fields(projected_columns.as_ref());
});
}
.map(|rg_idx| {
let md = &file_metadata.row_groups[rg_idx];
let mut part_md = ColumnToColumnChunkMD::new(md);
part_md.set_partitions(projected_columns.as_ref());
part_md
})
.collect::<Vec<_>>()
};

// Collect the data for the live columns
let mut live_columns = (0..row_groups.len() * num_live_columns)
.into_par_iter()
.map(|i| {
let col_idx = live_idx_to_col_idx[i % num_live_columns];
let rg_idx = row_groups[i / num_live_columns].index as usize;

let md = &file_metadata.row_groups[rg_idx];
column_idx_to_series(col_idx, md, None, schema, store)
let name = &schema.fields[col_idx].name;
let md = part_md[i / num_live_columns].get_partitions(name);
column_idx_to_series(col_idx, md.as_slice(), None, schema, store)
})
.collect::<PolarsResult<Vec<_>>>()?;

Expand Down Expand Up @@ -421,15 +427,13 @@ fn rg_to_dfs_prefiltered(
.into_par_iter()
.map(|i| {
let col_idx = dead_idx_to_col_idx[i % num_dead_columns];
let rg_idx = row_groups[i / num_dead_columns].index as usize;

let name = &schema.fields[col_idx].name;
let field_md = part_md[i / num_live_columns].get_partitions(name);
let (mask, _) = &dfs[i / num_dead_columns];

let md = &file_metadata.row_groups[rg_idx];
debug_assert_eq!(md.num_rows(), mask.len());
column_idx_to_series(
col_idx,
md,
field_md.as_slice(),
Some(Filter::new_masked(mask.clone())),
schema,
store,
Expand Down Expand Up @@ -503,15 +507,20 @@ fn rg_to_dfs_optionally_par_over_columns(

// Set partitioned fields to prevent quadratic behavior.
let projected_columns = projected_columns_set(schema, projection);
md.set_partition_fields(projected_columns.as_ref());
let mut part_md = ColumnToColumnChunkMD::new(md);
part_md.set_partitions(projected_columns.as_ref());

let columns = if let ParallelStrategy::Columns = parallel {
POOL.install(|| {
projection
.par_iter()
.map(|column_i| {
let name = &schema.fields[*column_i].name;
let part = part_md.get_partitions(name);

column_idx_to_series(
*column_i,
md,
part.as_slice(),
Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)),
schema,
store,
Expand All @@ -523,9 +532,12 @@ fn rg_to_dfs_optionally_par_over_columns(
projection
.iter()
.map(|column_i| {
let name = &schema.fields[*column_i].name;
let part = part_md.get_partitions(name);

column_idx_to_series(
*column_i,
md,
part.as_slice(),
Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)),
schema,
store,
Expand Down Expand Up @@ -602,16 +614,22 @@ fn rg_to_dfs_par_over_rg(
let dfs = POOL.install(|| {
// Set partitioned fields to prevent quadratic behavior.
// Ensure all row groups are partitioned.
{
let part_md = {
let projected_columns = projected_columns_set(schema, projection);
row_groups.par_iter().for_each(|(_, rg, _, _)| {
rg.set_partition_fields(projected_columns.as_ref());
});
}
row_groups
.par_iter()
.map(|(_, rg, _, _)| {
let mut ccmd = ColumnToColumnChunkMD::new(rg);
ccmd.set_partitions(projected_columns.as_ref());
ccmd
})
.collect::<Vec<_>>()
};

row_groups
.into_par_iter()
.map(|(rg_idx, md, slice, row_count_start)| {
.enumerate()
.map(|(iter_idx, (rg_idx, _md, slice, row_count_start))| {
if slice.1 == 0
|| use_statistics
&& !read_this_row_group(
Expand All @@ -631,9 +649,12 @@ fn rg_to_dfs_par_over_rg(
let columns = projection
.iter()
.map(|column_i| {
let name = &schema.fields[*column_i].name;
let field_md = part_md[iter_idx].get_partitions(name);

column_idx_to_series(
*column_i,
md,
field_md.as_slice(),
Some(Filter::new_ranged(slice.0, slice.0 + slice.1)),
schema,
store,
Expand Down
1 change: 0 additions & 1 deletion crates/polars-parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ bytemuck = { workspace = true }
ethnum = { workspace = true }
fallible-streaming-iterator = { workspace = true, optional = true }
futures = { workspace = true, optional = true }
hashbrown = { workspace = true }
num-traits = { workspace = true }
polars-compute = { workspace = true }
polars-error = { workspace = true }
Expand Down
48 changes: 0 additions & 48 deletions crates/polars-parquet/src/parquet/metadata/row_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
use std::sync::{Arc, RwLock};

use hashbrown::hash_map::RawEntryMut;
use parquet_format_safe::RowGroup;
use polars_utils::aliases::{InitHashMaps, PlHashMap, PlHashSet};
use polars_utils::idx_vec::UnitVec;
use polars_utils::unitvec;
#[cfg(feature = "serde_types")]
use serde::{Deserialize, Serialize};

Expand All @@ -13,17 +7,13 @@ use super::schema_descriptor::SchemaDescriptor;
use crate::parquet::error::{ParquetError, ParquetResult};
use crate::parquet::write::ColumnOffsetsMetadata;

pub type PartitionedFields = PlHashMap<String, UnitVec<usize>>;

/// Metadata for a row group.
#[derive(Debug, Clone, Default)]
#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))]
pub struct RowGroupMetaData {
columns: Vec<ColumnChunkMetaData>,
num_rows: usize,
total_byte_size: usize,
#[cfg_attr(feature = "serde_types", serde(skip))]
partitioned_fields: Arc<RwLock<PartitionedFields>>,
}

impl RowGroupMetaData {
Expand All @@ -37,46 +27,9 @@ impl RowGroupMetaData {
columns,
num_rows,
total_byte_size,
partitioned_fields: Default::default(),
}
}

pub fn set_partition_fields(&self, field_names: Option<&PlHashSet<&str>>) {
let mut out = PlHashMap::new();
for (i, x) in self.columns.iter().enumerate() {
let name = &x.descriptor().path_in_schema[0];
if field_names
.map(|field_names| field_names.contains(name.as_str()))
.unwrap_or(true)
{
let entry = out.raw_entry_mut().from_key(name.as_str());

match entry {
RawEntryMut::Vacant(slot) => {
slot.insert(name.to_string(), unitvec![i]);
},
RawEntryMut::Occupied(mut slot) => {
slot.get_mut().push(i);
},
};
}
}
let mut lock = self.partitioned_fields.write().unwrap();
*lock = out;
}

pub fn get_partition_fields(&self, name: &str) -> UnitVec<&ColumnChunkMetaData> {
let pf = self.partitioned_fields.read().unwrap();
debug_assert!(!pf.is_empty(), "fields should be partitioned first");
pf.get(name)
.map(|idx| {
idx.iter()
.map(|i| &self.columns[*i])
.collect::<UnitVec<_>>()
})
.unwrap_or_default()
}

/// Returns slice of column chunk metadata.
pub fn columns(&self) -> &[ColumnChunkMetaData] {
&self.columns
Expand Down Expand Up @@ -123,7 +76,6 @@ impl RowGroupMetaData {
columns,
num_rows,
total_byte_size,
partitioned_fields: Default::default(),
})
}

Expand Down

0 comments on commit 7053a6c

Please sign in to comment.