-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rewrite bloom filters to use contains API #8442
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,28 +18,21 @@ | |
use arrow::{array::ArrayRef, datatypes::Schema}; | ||
use arrow_array::BooleanArray; | ||
use arrow_schema::FieldRef; | ||
use datafusion_common::tree_node::{TreeNode, VisitRecursion}; | ||
use datafusion_common::{Column, DataFusionError, Result, ScalarValue}; | ||
use datafusion_common::{Column, ScalarValue}; | ||
use parquet::file::metadata::ColumnChunkMetaData; | ||
use parquet::schema::types::SchemaDescriptor; | ||
use parquet::{ | ||
arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder}, | ||
bloom_filter::Sbbf, | ||
file::metadata::RowGroupMetaData, | ||
}; | ||
use std::{ | ||
collections::{HashMap, HashSet}, | ||
sync::Arc, | ||
}; | ||
use std::collections::{HashMap, HashSet}; | ||
|
||
use crate::datasource::listing::FileRange; | ||
use crate::datasource::physical_plan::parquet::statistics::{ | ||
max_statistics, min_statistics, parquet_column, | ||
}; | ||
use crate::logical_expr::Operator; | ||
use crate::physical_expr::expressions as phys_expr; | ||
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; | ||
use crate::physical_plan::PhysicalExpr; | ||
|
||
use super::ParquetFileMetrics; | ||
|
||
|
@@ -118,188 +111,129 @@ pub(crate) fn prune_row_groups_by_statistics( | |
pub(crate) async fn prune_row_groups_by_bloom_filters< | ||
T: AsyncFileReader + Send + 'static, | ||
>( | ||
arrow_schema: &Schema, | ||
builder: &mut ParquetRecordBatchStreamBuilder<T>, | ||
row_groups: &[usize], | ||
groups: &[RowGroupMetaData], | ||
predicate: &PruningPredicate, | ||
metrics: &ParquetFileMetrics, | ||
) -> Vec<usize> { | ||
let bf_predicates = match BloomFilterPruningPredicate::try_new(predicate.orig_expr()) | ||
{ | ||
Ok(predicates) => predicates, | ||
Err(_) => { | ||
return row_groups.to_vec(); | ||
} | ||
}; | ||
let mut filtered = Vec::with_capacity(groups.len()); | ||
for idx in row_groups { | ||
let rg_metadata = &groups[*idx]; | ||
// get all columns bloom filter | ||
let mut column_sbbf = | ||
HashMap::with_capacity(bf_predicates.required_columns.len()); | ||
for column_name in bf_predicates.required_columns.iter() { | ||
let column_idx = match rg_metadata | ||
.columns() | ||
.iter() | ||
.enumerate() | ||
.find(|(_, column)| column.column_path().string().eq(column_name)) | ||
{ | ||
Some((column_idx, _)) => column_idx, | ||
None => continue, | ||
// get all columns in the predicate that we could use a bloom filter with | ||
let literal_columns = predicate.literal_columns(); | ||
let mut column_sbbf = HashMap::with_capacity(literal_columns.len()); | ||
|
||
for column_name in literal_columns { | ||
let Some((column_idx, _field)) = | ||
parquet_column(builder.parquet_schema(), arrow_schema, &column_name) | ||
else { | ||
continue; | ||
}; | ||
|
||
let bf = match builder | ||
.get_row_group_column_bloom_filter(*idx, column_idx) | ||
.await | ||
{ | ||
Ok(bf) => match bf { | ||
Some(bf) => bf, | ||
None => { | ||
continue; | ||
} | ||
}, | ||
Ok(Some(bf)) => bf, | ||
Ok(None) => continue, // no bloom filter for this column | ||
Err(e) => { | ||
log::error!("Error evaluating row group predicate values when using BloomFilterPruningPredicate {e}"); | ||
log::debug!("Ignoring error reading bloom filter: {e}"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in general, errors in logs I think are meant to be "something bad happened that caused a user error" -- in this case the query will proceed so I think debug is a better level. If someone needs it in production logs perhaps we can change it to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. debug level looks good to me. non-fatal errors on query path are easy to flow 👍 |
||
metrics.predicate_evaluation_errors.add(1); | ||
continue; | ||
} | ||
}; | ||
column_sbbf.insert(column_name.to_owned(), bf); | ||
column_sbbf.insert(column_name.to_string(), bf); | ||
} | ||
if bf_predicates.prune(&column_sbbf) { | ||
|
||
let stats = BloomFilterStatistics { column_sbbf }; | ||
|
||
// Can this group be pruned? | ||
let prune_group = match predicate.prune(&stats) { | ||
Ok(values) => !values[0], | ||
Err(e) => { | ||
log::debug!("Error evaluating row group predicate on bloom filter: {e}"); | ||
metrics.predicate_evaluation_errors.add(1); | ||
false | ||
} | ||
}; | ||
|
||
if prune_group { | ||
metrics.row_groups_pruned.add(1); | ||
continue; | ||
} else { | ||
filtered.push(*idx); | ||
} | ||
filtered.push(*idx); | ||
} | ||
filtered | ||
} | ||
|
||
struct BloomFilterPruningPredicate { | ||
/// Actual pruning predicate | ||
predicate_expr: Option<phys_expr::BinaryExpr>, | ||
/// The statistics required to evaluate this predicate | ||
required_columns: Vec<String>, | ||
/// Implements `PruningStatistics` for Parquet Split Block Bloom Filters (SBBF) | ||
struct BloomFilterStatistics { | ||
/// Maps column name to the parquet bloom filter | ||
column_sbbf: HashMap<String, Sbbf>, | ||
} | ||
|
||
impl BloomFilterPruningPredicate { | ||
fn try_new(expr: &Arc<dyn PhysicalExpr>) -> Result<Self> { | ||
let binary_expr = expr.as_any().downcast_ref::<phys_expr::BinaryExpr>(); | ||
match binary_expr { | ||
Some(binary_expr) => { | ||
let columns = Self::get_predicate_columns(expr); | ||
Ok(Self { | ||
predicate_expr: Some(binary_expr.clone()), | ||
required_columns: columns.into_iter().collect(), | ||
}) | ||
} | ||
None => Err(DataFusionError::Execution( | ||
"BloomFilterPruningPredicate only support binary expr".to_string(), | ||
)), | ||
} | ||
impl PruningStatistics for BloomFilterStatistics { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ❤ |
||
fn min_values(&self, _column: &Column) -> Option<ArrayRef> { | ||
None | ||
} | ||
|
||
fn prune(&self, column_sbbf: &HashMap<String, Sbbf>) -> bool { | ||
Self::prune_expr_with_bloom_filter(self.predicate_expr.as_ref(), column_sbbf) | ||
fn max_values(&self, _column: &Column) -> Option<ArrayRef> { | ||
None | ||
} | ||
|
||
/// Return true if the `expr` can be proved not `true` | ||
/// based on the bloom filter. | ||
/// | ||
/// We only checked `BinaryExpr` but it also support `InList`, | ||
/// Because of the `optimizer` will convert `InList` to `BinaryExpr`. | ||
fn prune_expr_with_bloom_filter( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
expr: Option<&phys_expr::BinaryExpr>, | ||
column_sbbf: &HashMap<String, Sbbf>, | ||
) -> bool { | ||
let Some(expr) = expr else { | ||
// unsupported predicate | ||
return false; | ||
}; | ||
match expr.op() { | ||
Operator::And | Operator::Or => { | ||
let left = Self::prune_expr_with_bloom_filter( | ||
expr.left().as_any().downcast_ref::<phys_expr::BinaryExpr>(), | ||
column_sbbf, | ||
); | ||
let right = Self::prune_expr_with_bloom_filter( | ||
expr.right() | ||
.as_any() | ||
.downcast_ref::<phys_expr::BinaryExpr>(), | ||
column_sbbf, | ||
); | ||
match expr.op() { | ||
Operator::And => left || right, | ||
Operator::Or => left && right, | ||
_ => false, | ||
} | ||
} | ||
Operator::Eq => { | ||
if let Some((col, val)) = Self::check_expr_is_col_equal_const(expr) { | ||
if let Some(sbbf) = column_sbbf.get(col.name()) { | ||
match val { | ||
ScalarValue::Utf8(Some(v)) => !sbbf.check(&v.as_str()), | ||
ScalarValue::Boolean(Some(v)) => !sbbf.check(&v), | ||
ScalarValue::Float64(Some(v)) => !sbbf.check(&v), | ||
ScalarValue::Float32(Some(v)) => !sbbf.check(&v), | ||
ScalarValue::Int64(Some(v)) => !sbbf.check(&v), | ||
ScalarValue::Int32(Some(v)) => !sbbf.check(&v), | ||
ScalarValue::Int16(Some(v)) => !sbbf.check(&v), | ||
ScalarValue::Int8(Some(v)) => !sbbf.check(&v), | ||
_ => false, | ||
} | ||
} else { | ||
false | ||
} | ||
} else { | ||
false | ||
} | ||
} | ||
_ => false, | ||
} | ||
fn num_containers(&self) -> usize { | ||
1 | ||
} | ||
|
||
fn get_predicate_columns(expr: &Arc<dyn PhysicalExpr>) -> HashSet<String> { | ||
let mut columns = HashSet::new(); | ||
expr.apply(&mut |expr| { | ||
if let Some(binary_expr) = | ||
expr.as_any().downcast_ref::<phys_expr::BinaryExpr>() | ||
{ | ||
if let Some((column, _)) = | ||
Self::check_expr_is_col_equal_const(binary_expr) | ||
{ | ||
columns.insert(column.name().to_string()); | ||
} | ||
} | ||
Ok(VisitRecursion::Continue) | ||
}) | ||
// no way to fail as only Ok(VisitRecursion::Continue) is returned | ||
.unwrap(); | ||
|
||
columns | ||
fn null_counts(&self, _column: &Column) -> Option<ArrayRef> { | ||
None | ||
} | ||
|
||
fn check_expr_is_col_equal_const( | ||
exr: &phys_expr::BinaryExpr, | ||
) -> Option<(phys_expr::Column, ScalarValue)> { | ||
if Operator::Eq.ne(exr.op()) { | ||
return None; | ||
} | ||
/// Use bloom filters to determine if we are sure this column can not | ||
/// possibly contain `values` | ||
/// | ||
/// The `contained` API returns false if the bloom filters knows that *ALL* | ||
/// of the values in a column are not present. | ||
fn contained( | ||
&self, | ||
column: &Column, | ||
values: &HashSet<ScalarValue>, | ||
) -> Option<BooleanArray> { | ||
let sbbf = self.column_sbbf.get(column.name.as_str())?; | ||
|
||
let left_any = exr.left().as_any(); | ||
let right_any = exr.right().as_any(); | ||
if let (Some(col), Some(liter)) = ( | ||
left_any.downcast_ref::<phys_expr::Column>(), | ||
right_any.downcast_ref::<phys_expr::Literal>(), | ||
) { | ||
return Some((col.clone(), liter.value().clone())); | ||
} | ||
if let (Some(liter), Some(col)) = ( | ||
left_any.downcast_ref::<phys_expr::Literal>(), | ||
right_any.downcast_ref::<phys_expr::Column>(), | ||
) { | ||
return Some((col.clone(), liter.value().clone())); | ||
} | ||
None | ||
// Bloom filters are probabilistic data structures that can return false | ||
// positives (i.e. it might return true even if the value is not | ||
// present) however, the bloom filter will return `false` if the value is | ||
// definitely not present. | ||
|
||
let known_not_present = values | ||
.iter() | ||
.map(|value| match value { | ||
ScalarValue::Utf8(Some(v)) => sbbf.check(&v.as_str()), | ||
ScalarValue::Boolean(Some(v)) => sbbf.check(v), | ||
ScalarValue::Float64(Some(v)) => sbbf.check(v), | ||
ScalarValue::Float32(Some(v)) => sbbf.check(v), | ||
ScalarValue::Int64(Some(v)) => sbbf.check(v), | ||
ScalarValue::Int32(Some(v)) => sbbf.check(v), | ||
ScalarValue::Int16(Some(v)) => sbbf.check(v), | ||
ScalarValue::Int8(Some(v)) => sbbf.check(v), | ||
_ => true, | ||
}) | ||
// The row group doesn't contain any of the values if | ||
// all the checks are false | ||
.all(|v| !v); | ||
|
||
let contains = if known_not_present { | ||
Some(false) | ||
} else { | ||
// Given the bloom filter is probabilistic, we can't be sure that | ||
// the row group actually contains the values. Return `None` to | ||
// indicate this uncertainty | ||
None | ||
}; | ||
|
||
Some(BooleanArray::from(vec![contains])) | ||
} | ||
} | ||
|
||
|
@@ -1367,6 +1301,7 @@ mod tests { | |
|
||
let metadata = builder.metadata().clone(); | ||
let pruned_row_group = prune_row_groups_by_bloom_filters( | ||
pruning_predicate.schema(), | ||
&mut builder, | ||
row_groups, | ||
metadata.row_groups(), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also fixes a potential bug similar to #8335 by finding the correct fields