Skip to content

Commit

Permalink
poc
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite committed Aug 1, 2024
1 parent 4fb6faa commit f6b1543
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 14 deletions.
5 changes: 5 additions & 0 deletions crates/polars-expr/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,11 @@ impl PhysicalIoExpr for PhysicalIoHelper {
self.expr.evaluate(df, &state)
}

fn live_variables(&self) -> PolarsResult<Vec<Arc<str>>> {
// @TODO: This should not unwrap
Ok(expr_to_leaf_column_names(self.expr.as_expression().unwrap()))
}

#[cfg(feature = "parquet")]
fn as_stats_evaluator(&self) -> Option<&dyn polars_io::predicates::StatsEvaluator> {
self.expr.as_stats_evaluator()
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/src/parquet/read/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ fn _mmap_single_column<'a>(
pub(super) fn to_deserializer<'a>(
columns: Vec<(&ColumnChunkMetaData, MemSlice)>,
field: Field,
num_rows: usize,
filter: Option<Filter>,
) -> PolarsResult<ArrayIter<'a>> {
let (columns, types): (Vec<_>, Vec<_>) = columns
.into_iter()
Expand All @@ -87,5 +87,5 @@ pub(super) fn to_deserializer<'a>(
})
.unzip();

column_iter_to_arrays(columns, types, field, Some(Filter::new_limited(num_rows)))
column_iter_to_arrays(columns, types, field, filter)
}
239 changes: 228 additions & 11 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ use std::collections::VecDeque;
use std::ops::{Deref, Range};

use arrow::array::new_empty_array;
use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::datatypes::ArrowSchemaRef;
use arrow::pushable::Pushable;
use polars_core::prelude::*;
use polars_core::utils::{accumulate_dataframes_vertical, split_df};
use polars_core::POOL;
use polars_parquet::read::{self, ArrayIter, FileMetaData, PhysicalType, RowGroupMetaData};
use polars_parquet::read::{self, ArrayIter, FileMetaData, Filter, PhysicalType, RowGroupMetaData};
use polars_utils::mmap::MemSlice;
use rayon::prelude::*;
use simd_json::prelude::ArrayMut;

#[cfg(feature = "cloud")]
use super::async_impl::FetchRowGroupsFromObjectStore;
Expand Down Expand Up @@ -54,7 +57,7 @@ fn assert_dtypes(data_type: &ArrowDataType) {
fn column_idx_to_series(
column_i: usize,
md: &RowGroupMetaData,
remaining_rows: usize,
filter: Option<Filter>,
file_schema: &ArrowSchema,
store: &mmap::ColumnStore,
) -> PolarsResult<Series> {
Expand All @@ -66,13 +69,13 @@ fn column_idx_to_series(
}

let columns = mmap_columns(store, md.columns(), &field.name);
let iter = mmap::to_deserializer(columns, field.clone(), remaining_rows)?;
let iter = mmap::to_deserializer(columns, field.clone(), filter)?;

let mut series = if remaining_rows < md.num_rows() {
array_iter_to_series(iter, field, Some(remaining_rows))
} else {
array_iter_to_series(iter, field, None)
}?;
// let mut series = if remaining_rows < md.num_rows() {
// array_iter_to_series(iter, field, Some(remaining_rows))
// } else {
let mut series = array_iter_to_series(iter, field, None)?;
// }?;

// See if we can find some statistics for this series. If we cannot find anything just return
// the series as is.
Expand Down Expand Up @@ -166,6 +169,25 @@ fn rg_to_dfs(
use_statistics: bool,
hive_partition_columns: Option<&[Series]>,
) -> PolarsResult<Vec<DataFrame>> {
if std::env::var("POLARS_NEW_PARQUET").is_ok() {
if let Some(predicate) = predicate {
return rg_to_dfs_new(
store,
previous_row_count,
row_group_start,
row_group_end,
file_metadata,
schema,
predicate,
row_index,
parallel,
projection,
use_statistics,
hive_partition_columns,
);
}
}

if let ParallelStrategy::Columns | ParallelStrategy::None = parallel {
rg_to_dfs_optionally_par_over_columns(
store,
Expand Down Expand Up @@ -200,6 +222,183 @@ fn rg_to_dfs(
}
}

#[allow(clippy::too_many_arguments)]
fn rg_to_dfs_new(
store: &mmap::ColumnStore,
previous_row_count: &mut IdxSize,
row_group_start: usize,
row_group_end: usize,
file_metadata: &FileMetaData,
schema: &ArrowSchemaRef,
predicate: &dyn PhysicalIoExpr,
row_index: Option<RowIndex>,
parallel: ParallelStrategy,
projection: &[usize],
use_statistics: bool,
hive_partition_columns: Option<&[Series]>,
) -> PolarsResult<Vec<DataFrame>> {
let num_row_groups = row_group_end - row_group_start;

let mut row_offset = *previous_row_count;
let mut included_row_groups = Vec::with_capacity(num_row_groups);
let mut row_group_offsets = Vec::with_capacity(num_row_groups);

for rg_idx in row_group_start..row_group_end {
let md = &file_metadata.row_groups[rg_idx];

let current_offset = row_offset;
let current_row_count = md.num_rows() as IdxSize;
row_offset += current_row_count;

row_group_offsets.push(current_offset);

if use_statistics
&& !read_this_row_group(Some(predicate), &file_metadata.row_groups[rg_idx], schema)?
{
continue;
}

included_row_groups.push(rg_idx);
}

let live_variables = predicate.live_variables()?;

dbg!(&live_variables);

let mut live_indices = Vec::with_capacity(live_variables.len());
let live_variables = live_variables
.iter()
.map(Deref::deref)
.collect::<PlHashSet<_>>();

for (i, col) in file_metadata.schema().columns().iter().enumerate() {
if live_variables.contains(col.path_in_schema[0].deref()) {
live_indices.push(i);
}
}
debug_assert_eq!(live_variables.len(), live_indices.len());

let pred_columns = POOL.install(|| {
(0..included_row_groups.len() * live_indices.len())
.into_par_iter()
.map(|i| {
let col_idx = live_indices[i % live_indices.len()];
let rg_idx = included_row_groups[i / live_indices.len()];

let md = &file_metadata.row_groups[rg_idx];
let num_rows = md.num_rows();
column_idx_to_series(
col_idx,
md,
Some(Filter::new_limited(num_rows)),
schema,
store,
)
})
.collect::<PolarsResult<Vec<_>>>()
})?;

let mut df_columns = vec![Vec::with_capacity(live_indices.len()); num_row_groups];
for (i, col) in pred_columns.into_iter().enumerate() {
df_columns[i / live_indices.len()].push(col);
}

let dfs = df_columns
.into_par_iter()
.enumerate()
.map(|(i, columns)| {
let rg_idx = included_row_groups[i];

let md = &file_metadata.row_groups[rg_idx];
let mut df = unsafe { DataFrame::new_no_checks(columns) };
if let Some(rc) = &row_index {
df.with_row_index_mut(&rc.name, Some(row_group_offsets[rg_idx - row_group_start]));
}

materialize_hive_partitions(
&mut df,
schema.as_ref(),
hive_partition_columns,
md.num_rows(),
);
let s = predicate.evaluate_io(&df)?;
let mask = s.bool().expect("filter predicates was not of type boolean");
df = df.filter(mask)?;

let mut bitmap = MutableBitmap::with_capacity(mask.len());

for chunk in mask.downcast_iter() {
bitmap.extend_from_bitmap(chunk.values());
}

let bitmap = bitmap.freeze();

debug_assert_eq!(df.height(), bitmap.set_bits());
Ok((bitmap, df))
})
.filter(|v| v.as_ref().is_ok_and(|(bm, _)| bm.set_bits() > 0))
.collect::<PolarsResult<Vec<(Bitmap, DataFrame)>>>()?;

// @TODO
// *previous_row_count += df.height() as IdxSize;

let num_unloaded_columns = projection.len() - live_indices.len();
let rg_columns = POOL.install(|| {
(0..dfs.len() * num_unloaded_columns)
.into_par_iter()
.map(|i| {
// @TODO: This delta is really slow and stupid
let mut col_idx = i % num_unloaded_columns;
for i in live_indices.iter().copied() {
col_idx += usize::from(i <= col_idx);
}
let rg_idx = included_row_groups[i / num_unloaded_columns];

let (mask, _) = &dfs[i / num_unloaded_columns];

let md = &file_metadata.row_groups[rg_idx];
column_idx_to_series(
col_idx,
md,
Some(Filter::new_masked(mask.clone())),
schema,
store,
)
})
.collect::<PolarsResult<Vec<_>>>()
})?;

let mut rg_columns = rg_columns;
let mut dfs = dfs;

let mut invalid_schema: Schema = Schema::new();
for i in live_indices.iter().copied() {
invalid_schema.insert_at_index(
invalid_schema.len(),
schema.fields[i].name.clone().into(),
schema.fields[i].data_type().into(),
)?;
}
invalid_schema.merge(Schema::from(schema.as_ref()));
for df_idx in 0..dfs.len() {
dfs[df_idx].1._add_columns(
rg_columns.drain(0..num_unloaded_columns).collect(),
&invalid_schema,
)?;
}

#[cfg(debug_assertions)]
{
for cols in &dfs {
assert_eq!(cols.1.width(), projection.len());
}
}

dfs.into_iter()
.map(|(_, df)| df.select(schema.get_names()))
.collect()
}

#[allow(clippy::too_many_arguments)]
// might parallelize over columns
fn rg_to_dfs_optionally_par_over_columns(
Expand Down Expand Up @@ -241,15 +440,27 @@ fn rg_to_dfs_optionally_par_over_columns(
projection
.par_iter()
.map(|column_i| {
column_idx_to_series(*column_i, md, projection_height, schema, store)
column_idx_to_series(
*column_i,
md,
Some(Filter::new_limited(projection_height)),
schema,
store,
)
})
.collect::<PolarsResult<Vec<_>>>()
})?
} else {
projection
.iter()
.map(|column_i| {
column_idx_to_series(*column_i, md, projection_height, schema, store)
column_idx_to_series(
*column_i,
md,
Some(Filter::new_limited(projection_height)),
schema,
store,
)
})
.collect::<PolarsResult<Vec<_>>>()?
};
Expand Down Expand Up @@ -336,7 +547,13 @@ fn rg_to_dfs_par_over_rg(
let columns = projection
.iter()
.map(|column_i| {
column_idx_to_series(*column_i, md, projection_height, schema, store)
column_idx_to_series(
*column_i,
md,
Some(Filter::new_limited(projection_height)),
schema,
store,
)
})
.collect::<PolarsResult<Vec<_>>>()?;

Expand Down
2 changes: 2 additions & 0 deletions crates/polars-io/src/predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ pub trait PhysicalIoExpr: Send + Sync {
/// as a predicate mask
fn evaluate_io(&self, df: &DataFrame) -> PolarsResult<Series>;

fn live_variables(&self) -> PolarsResult<Vec<Arc<str>>>;

/// Can take &dyn Statistics and determine of a file should be
/// read -> `true`
/// or not -> `false`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ impl PhysicalIoExpr for Wrap {
};
h.evaluate_io(df)
}
fn live_variables(&self) -> PolarsResult<Vec<Arc<str>>> {
// @TODO: This should not unwrap
Ok(expr_to_leaf_column_names(self.0.as_expression().unwrap()))
}
fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> {
self.0.as_stats_evaluator()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl<'a, D: Decoder> State<'a, D> {
num_ones,
)?;

if self.len() == 0 {
if iter.num_remaining() == 0 || self.len() == 0 {
break;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ impl PhysicalIoExpr for Len {
fn evaluate_io(&self, _df: &DataFrame) -> PolarsResult<Series> {
unimplemented!()
}

fn live_variables(&self) -> PolarsResult<Vec<Arc<str>>> {
Ok(vec![])
}
}
impl PhysicalPipedExpr for Len {
fn evaluate(&self, chunk: &DataChunk, _lazy_state: &ExecutionState) -> PolarsResult<Series> {
Expand Down
5 changes: 5 additions & 0 deletions crates/polars-pipe/src/pipeline/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ where
fn evaluate_io(&self, df: &DataFrame) -> PolarsResult<Series> {
self.p.evaluate_io(df)
}

fn live_variables(&self) -> PolarsResult<Vec<Arc<str>>> {
todo!()
}

fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> {
self.p.as_stats_evaluator()
}
Expand Down

0 comments on commit f6b1543

Please sign in to comment.