-
Couldn't load subscription status.
- Fork 1.7k
Update to arrow/parquet 11.0 #2048
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -53,9 +53,8 @@ use arrow::{ | |
| use log::{debug, warn}; | ||
| use parquet::arrow::ArrowWriter; | ||
| use parquet::file::{ | ||
| metadata::RowGroupMetaData, | ||
| reader::{FileReader, SerializedFileReader}, | ||
| statistics::Statistics as ParquetStatistics, | ||
| metadata::RowGroupMetaData, reader::SerializedFileReader, | ||
| serialized_reader::ReadOptionsBuilder, statistics::Statistics as ParquetStatistics, | ||
| }; | ||
|
|
||
| use fmt::Debug; | ||
|
|
@@ -309,7 +308,7 @@ fn send_result( | |
| /// Wraps parquet statistics in a way | ||
| /// that implements [`PruningStatistics`] | ||
| struct RowGroupPruningStatistics<'a> { | ||
| row_group_metadata: &'a [RowGroupMetaData], | ||
| row_group_metadata: &'a RowGroupMetaData, | ||
| parquet_schema: &'a Schema, | ||
| } | ||
|
|
||
|
|
@@ -342,33 +341,26 @@ macro_rules! get_statistic { | |
| // Extract the min or max value calling `func` or `bytes_func` on the ParquetStatistics as appropriate | ||
| macro_rules! get_min_max_values { | ||
| ($self:expr, $column:expr, $func:ident, $bytes_func:ident) => {{ | ||
| let (column_index, field) = if let Some((v, f)) = $self.parquet_schema.column_with_name(&$column.name) { | ||
| (v, f) | ||
| } else { | ||
| // Named column was not present | ||
| return None | ||
| }; | ||
| let (column_index, field) = | ||
| if let Some((v, f)) = $self.parquet_schema.column_with_name(&$column.name) { | ||
| (v, f) | ||
| } else { | ||
| // Named column was not present | ||
| return None; | ||
| }; | ||
|
|
||
| let data_type = field.data_type(); | ||
| // The result may be None, because DataFusion doesn't have support for ScalarValues of the column type | ||
| let null_scalar: ScalarValue = data_type.try_into().ok()?; | ||
|
|
||
| let scalar_values : Vec<ScalarValue> = $self.row_group_metadata | ||
| .iter() | ||
| .flat_map(|meta| { | ||
| meta.column(column_index).statistics() | ||
| }) | ||
| .map(|stats| { | ||
| get_statistic!(stats, $func, $bytes_func) | ||
| }) | ||
| .map(|maybe_scalar| { | ||
| // column either did't have statistics at all or didn't have min/max values | ||
| maybe_scalar.unwrap_or_else(|| null_scalar.clone()) | ||
| }) | ||
| .collect(); | ||
|
|
||
| // ignore errors converting to arrays (e.g. different types) | ||
| ScalarValue::iter_to_array(scalar_values).ok() | ||
| $self.row_group_metadata | ||
| .column(column_index) | ||
| .statistics() | ||
| .map(|stats| get_statistic!(stats, $func, $bytes_func)) | ||
| .flatten() | ||
| // column either didn't have statistics at all or didn't have min/max values | ||
| .or_else(|| Some(null_scalar.clone())) | ||
| .map(|s| s.to_array()) | ||
| }} | ||
| } | ||
|
|
||
|
|
@@ -383,17 +375,14 @@ macro_rules! get_null_count_values { | |
| return None; | ||
| }; | ||
|
|
||
| let scalar_values: Vec<ScalarValue> = $self | ||
| .row_group_metadata | ||
| .iter() | ||
| .flat_map(|meta| meta.column(column_index).statistics()) | ||
| .map(|stats| { | ||
| ScalarValue::UInt64(Some(stats.null_count().try_into().unwrap())) | ||
| }) | ||
| .collect(); | ||
|
|
||
| // ignore errors converting to arrays (e.g. different types) | ||
| ScalarValue::iter_to_array(scalar_values).ok() | ||
| let value = ScalarValue::UInt64( | ||
| $self | ||
| .row_group_metadata | ||
| .column(column_index) | ||
| .statistics() | ||
| .map(|s| s.null_count()), | ||
| ); | ||
| Some(value.to_array()) | ||
| }}; | ||
| } | ||
|
|
||
|
|
@@ -407,7 +396,7 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { | |
| } | ||
|
|
||
| fn num_containers(&self) -> usize { | ||
| self.row_group_metadata.len() | ||
| 1 | ||
| } | ||
|
|
||
| fn null_counts(&self, column: &Column) -> Option<ArrayRef> { | ||
|
|
@@ -418,31 +407,33 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { | |
| fn build_row_group_predicate( | ||
| pruning_predicate: &PruningPredicate, | ||
| metrics: ParquetFileMetrics, | ||
| row_group_metadata: &[RowGroupMetaData], | ||
| ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> { | ||
| let parquet_schema = pruning_predicate.schema().as_ref(); | ||
|
|
||
| let pruning_stats = RowGroupPruningStatistics { | ||
| row_group_metadata, | ||
| parquet_schema, | ||
| }; | ||
| let predicate_values = pruning_predicate.prune(&pruning_stats); | ||
|
|
||
| match predicate_values { | ||
| Ok(values) => { | ||
| // NB: false means don't scan row group | ||
| let num_pruned = values.iter().filter(|&v| !*v).count(); | ||
| metrics.row_groups_pruned.add(num_pruned); | ||
| Box::new(move |_, i| values[i]) | ||
| } | ||
| // stats filter array could not be built | ||
| // return a closure which will not filter out any row groups | ||
| Err(e) => { | ||
| debug!("Error evaluating row group predicate values {}", e); | ||
| metrics.predicate_evaluation_errors.add(1); | ||
| Box::new(|_r, _i| true) | ||
| } | ||
| } | ||
| ) -> Box<dyn FnMut(&RowGroupMetaData, usize) -> bool> { | ||
| let pruning_predicate = pruning_predicate.clone(); | ||
| Box::new( | ||
| move |row_group_metadata: &RowGroupMetaData, _i: usize| -> bool { | ||
| let parquet_schema = pruning_predicate.schema().as_ref(); | ||
| let pruning_stats = RowGroupPruningStatistics { | ||
| row_group_metadata, | ||
| parquet_schema, | ||
| }; | ||
| let predicate_values = pruning_predicate.prune(&pruning_stats); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there is probably some overhead here related to calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah... I just stumbled across this whilst updating #1617 - in IOx we found the prune method had non-trivial overheads when run in a non-columnar fashion as this is doing. Admittedly that was likely with more containers than there are likely to be row groups in a file. I do wonder if we need to take a step back from extending the parquet arrow-rs interface, and take a more holistic look at what the desired end-state should be. I worry a bit that we're painting ourselves into a corner, I'll see if I can get my thoughts into some tickets There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about we change ReadOptions like: pub struct ReadOptions {
predicates: Vec<Box<dyn Fn(&[RowGroupMetaData]) -> vec<bool>>>,
}There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That would definitely be one option, but I'm not sure why it needs to be lazy. |
||
| match predicate_values { | ||
| Ok(values) => { | ||
| // NB: false means don't scan row group | ||
| let num_pruned = values.iter().filter(|&v| !*v).count(); | ||
| metrics.row_groups_pruned.add(num_pruned); | ||
| values[0] | ||
| } | ||
| // stats filter array could not be built | ||
| // return a closure which will not filter out any row groups | ||
| Err(e) => { | ||
| debug!("Error evaluating row group predicate values {}", e); | ||
| metrics.predicate_evaluation_errors.add(1); | ||
| true | ||
| } | ||
| } | ||
| }, | ||
| ) | ||
| } | ||
|
|
||
| #[allow(clippy::too_many_arguments)] | ||
|
|
@@ -470,17 +461,20 @@ fn read_partition( | |
| ); | ||
| let object_reader = | ||
| object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?; | ||
| let mut file_reader = | ||
| SerializedFileReader::new(ChunkObjectReader(object_reader))?; | ||
|
|
||
| let mut opt = ReadOptionsBuilder::new(); | ||
| if let Some(pruning_predicate) = pruning_predicate { | ||
| let row_group_predicate = build_row_group_predicate( | ||
| opt = opt.with_predicate(build_row_group_predicate( | ||
| pruning_predicate, | ||
| file_metrics, | ||
| file_reader.metadata().row_groups(), | ||
| ); | ||
| file_reader.filter_row_groups(&row_group_predicate); | ||
| )); | ||
| } | ||
|
|
||
| let file_reader = SerializedFileReader::new_with_options( | ||
| ChunkObjectReader(object_reader), | ||
| opt.build(), | ||
| )?; | ||
|
|
||
| let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); | ||
| let adapted_projections = | ||
| schema_adapter.map_projections(&arrow_reader.get_schema()?, projection)?; | ||
|
|
@@ -1054,11 +1048,8 @@ mod tests { | |
| vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)], | ||
| ); | ||
| let row_group_metadata = vec![rgm1, rgm2]; | ||
| let row_group_predicate = build_row_group_predicate( | ||
| &pruning_predicate, | ||
| parquet_file_metrics(), | ||
| &row_group_metadata, | ||
| ); | ||
| let mut row_group_predicate = | ||
| build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); | ||
| let row_group_filter = row_group_metadata | ||
| .iter() | ||
| .enumerate() | ||
|
|
@@ -1087,11 +1078,8 @@ mod tests { | |
| vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)], | ||
| ); | ||
| let row_group_metadata = vec![rgm1, rgm2]; | ||
| let row_group_predicate = build_row_group_predicate( | ||
| &pruning_predicate, | ||
| parquet_file_metrics(), | ||
| &row_group_metadata, | ||
| ); | ||
| let mut row_group_predicate = | ||
| build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); | ||
| let row_group_filter = row_group_metadata | ||
| .iter() | ||
| .enumerate() | ||
|
|
@@ -1135,11 +1123,8 @@ mod tests { | |
| ], | ||
| ); | ||
| let row_group_metadata = vec![rgm1, rgm2]; | ||
| let row_group_predicate = build_row_group_predicate( | ||
| &pruning_predicate, | ||
| parquet_file_metrics(), | ||
| &row_group_metadata, | ||
| ); | ||
| let mut row_group_predicate = | ||
| build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); | ||
| let row_group_filter = row_group_metadata | ||
| .iter() | ||
| .enumerate() | ||
|
|
@@ -1153,11 +1138,8 @@ mod tests { | |
| // this bypasses the entire predicate expression and no row groups are filtered out | ||
| let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2))); | ||
| let pruning_predicate = PruningPredicate::try_new(expr, schema)?; | ||
| let row_group_predicate = build_row_group_predicate( | ||
| &pruning_predicate, | ||
| parquet_file_metrics(), | ||
| &row_group_metadata, | ||
| ); | ||
| let mut row_group_predicate = | ||
| build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); | ||
| let row_group_filter = row_group_metadata | ||
| .iter() | ||
| .enumerate() | ||
|
|
@@ -1202,11 +1184,8 @@ mod tests { | |
| let pruning_predicate = PruningPredicate::try_new(expr, schema)?; | ||
| let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate(); | ||
|
|
||
| let row_group_predicate = build_row_group_predicate( | ||
| &pruning_predicate, | ||
| parquet_file_metrics(), | ||
| &row_group_metadata, | ||
| ); | ||
| let mut row_group_predicate = | ||
| build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); | ||
| let row_group_filter = row_group_metadata | ||
| .iter() | ||
| .enumerate() | ||
|
|
@@ -1234,11 +1213,8 @@ mod tests { | |
| let pruning_predicate = PruningPredicate::try_new(expr, schema)?; | ||
| let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate(); | ||
|
|
||
| let row_group_predicate = build_row_group_predicate( | ||
| &pruning_predicate, | ||
| parquet_file_metrics(), | ||
| &row_group_metadata, | ||
| ); | ||
| let mut row_group_predicate = | ||
| build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); | ||
| let row_group_filter = row_group_metadata | ||
| .iter() | ||
| .enumerate() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍