Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite bloom filters to use contains API #8442

Merged
merged 1 commit into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ impl FileOpener for ParquetOpener {
if enable_bloom_filter && !row_groups.is_empty() {
if let Some(predicate) = predicate {
row_groups = row_groups::prune_row_groups_by_bloom_filters(
&file_schema,
&mut builder,
&row_groups,
file_metadata.row_groups(),
Expand Down
245 changes: 90 additions & 155 deletions datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,21 @@
use arrow::{array::ArrayRef, datatypes::Schema};
use arrow_array::BooleanArray;
use arrow_schema::FieldRef;
use datafusion_common::tree_node::{TreeNode, VisitRecursion};
use datafusion_common::{Column, DataFusionError, Result, ScalarValue};
use datafusion_common::{Column, ScalarValue};
use parquet::file::metadata::ColumnChunkMetaData;
use parquet::schema::types::SchemaDescriptor;
use parquet::{
arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder},
bloom_filter::Sbbf,
file::metadata::RowGroupMetaData,
};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use std::collections::{HashMap, HashSet};

use crate::datasource::listing::FileRange;
use crate::datasource::physical_plan::parquet::statistics::{
max_statistics, min_statistics, parquet_column,
};
use crate::logical_expr::Operator;
use crate::physical_expr::expressions as phys_expr;
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use crate::physical_plan::PhysicalExpr;

use super::ParquetFileMetrics;

Expand Down Expand Up @@ -118,188 +111,129 @@ pub(crate) fn prune_row_groups_by_statistics(
pub(crate) async fn prune_row_groups_by_bloom_filters<
T: AsyncFileReader + Send + 'static,
>(
arrow_schema: &Schema,
builder: &mut ParquetRecordBatchStreamBuilder<T>,
row_groups: &[usize],
groups: &[RowGroupMetaData],
predicate: &PruningPredicate,
metrics: &ParquetFileMetrics,
) -> Vec<usize> {
let bf_predicates = match BloomFilterPruningPredicate::try_new(predicate.orig_expr())
{
Ok(predicates) => predicates,
Err(_) => {
return row_groups.to_vec();
}
};
let mut filtered = Vec::with_capacity(groups.len());
for idx in row_groups {
let rg_metadata = &groups[*idx];
// get all columns bloom filter
let mut column_sbbf =
HashMap::with_capacity(bf_predicates.required_columns.len());
for column_name in bf_predicates.required_columns.iter() {
let column_idx = match rg_metadata
.columns()
.iter()
.enumerate()
.find(|(_, column)| column.column_path().string().eq(column_name))
{
Some((column_idx, _)) => column_idx,
None => continue,
// get all columns in the predicate that we could use a bloom filter with
let literal_columns = predicate.literal_columns();
let mut column_sbbf = HashMap::with_capacity(literal_columns.len());

for column_name in literal_columns {
let Some((column_idx, _field)) =
parquet_column(builder.parquet_schema(), arrow_schema, &column_name)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also fixes a potential bug similar to #8335 by finding the correct fields

else {
continue;
};

let bf = match builder
.get_row_group_column_bloom_filter(*idx, column_idx)
.await
{
Ok(bf) => match bf {
Some(bf) => bf,
None => {
continue;
}
},
Ok(Some(bf)) => bf,
Ok(None) => continue, // no bloom filter for this column
Err(e) => {
log::error!("Error evaluating row group predicate values when using BloomFilterPruningPredicate {e}");
log::debug!("Ignoring error reading bloom filter: {e}");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in general, errors in logs I think are meant to be "something bad happened that caused a user error" -- in this case the query will proceed so I think debug is a better level.

If someone needs it in production logs perhaps we can change it to log

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug level looks good to me. non-fatal errors on query path are easy to flow 👍

metrics.predicate_evaluation_errors.add(1);
continue;
}
};
column_sbbf.insert(column_name.to_owned(), bf);
column_sbbf.insert(column_name.to_string(), bf);
}
if bf_predicates.prune(&column_sbbf) {

let stats = BloomFilterStatistics { column_sbbf };

// Can this group be pruned?
let prune_group = match predicate.prune(&stats) {
Ok(values) => !values[0],
Err(e) => {
log::debug!("Error evaluating row group predicate on bloom filter: {e}");
metrics.predicate_evaluation_errors.add(1);
false
}
};

if prune_group {
metrics.row_groups_pruned.add(1);
continue;
} else {
filtered.push(*idx);
}
filtered.push(*idx);
}
filtered
}

struct BloomFilterPruningPredicate {
/// Actual pruning predicate
predicate_expr: Option<phys_expr::BinaryExpr>,
/// The statistics required to evaluate this predicate
required_columns: Vec<String>,
/// Implements `PruningStatistics` for Parquet Split Block Bloom Filters (SBBF)
struct BloomFilterStatistics {
/// Maps column name to the parquet bloom filter
column_sbbf: HashMap<String, Sbbf>,
}

impl BloomFilterPruningPredicate {
fn try_new(expr: &Arc<dyn PhysicalExpr>) -> Result<Self> {
let binary_expr = expr.as_any().downcast_ref::<phys_expr::BinaryExpr>();
match binary_expr {
Some(binary_expr) => {
let columns = Self::get_predicate_columns(expr);
Ok(Self {
predicate_expr: Some(binary_expr.clone()),
required_columns: columns.into_iter().collect(),
})
}
None => Err(DataFusionError::Execution(
"BloomFilterPruningPredicate only support binary expr".to_string(),
)),
}
impl PruningStatistics for BloomFilterStatistics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fn min_values(&self, _column: &Column) -> Option<ArrayRef> {
None
}

fn prune(&self, column_sbbf: &HashMap<String, Sbbf>) -> bool {
Self::prune_expr_with_bloom_filter(self.predicate_expr.as_ref(), column_sbbf)
fn max_values(&self, _column: &Column) -> Option<ArrayRef> {
None
}

/// Return true if the `expr` can be proved not `true`
/// based on the bloom filter.
///
/// We only checked `BinaryExpr` but it also support `InList`,
/// Because of the `optimizer` will convert `InList` to `BinaryExpr`.
fn prune_expr_with_bloom_filter(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of this logic is now split between LiteralGuarantee #8437 and PruningPredicate: #8440

expr: Option<&phys_expr::BinaryExpr>,
column_sbbf: &HashMap<String, Sbbf>,
) -> bool {
let Some(expr) = expr else {
// unsupported predicate
return false;
};
match expr.op() {
Operator::And | Operator::Or => {
let left = Self::prune_expr_with_bloom_filter(
expr.left().as_any().downcast_ref::<phys_expr::BinaryExpr>(),
column_sbbf,
);
let right = Self::prune_expr_with_bloom_filter(
expr.right()
.as_any()
.downcast_ref::<phys_expr::BinaryExpr>(),
column_sbbf,
);
match expr.op() {
Operator::And => left || right,
Operator::Or => left && right,
_ => false,
}
}
Operator::Eq => {
if let Some((col, val)) = Self::check_expr_is_col_equal_const(expr) {
if let Some(sbbf) = column_sbbf.get(col.name()) {
match val {
ScalarValue::Utf8(Some(v)) => !sbbf.check(&v.as_str()),
ScalarValue::Boolean(Some(v)) => !sbbf.check(&v),
ScalarValue::Float64(Some(v)) => !sbbf.check(&v),
ScalarValue::Float32(Some(v)) => !sbbf.check(&v),
ScalarValue::Int64(Some(v)) => !sbbf.check(&v),
ScalarValue::Int32(Some(v)) => !sbbf.check(&v),
ScalarValue::Int16(Some(v)) => !sbbf.check(&v),
ScalarValue::Int8(Some(v)) => !sbbf.check(&v),
_ => false,
}
} else {
false
}
} else {
false
}
}
_ => false,
}
fn num_containers(&self) -> usize {
1
}

fn get_predicate_columns(expr: &Arc<dyn PhysicalExpr>) -> HashSet<String> {
let mut columns = HashSet::new();
expr.apply(&mut |expr| {
if let Some(binary_expr) =
expr.as_any().downcast_ref::<phys_expr::BinaryExpr>()
{
if let Some((column, _)) =
Self::check_expr_is_col_equal_const(binary_expr)
{
columns.insert(column.name().to_string());
}
}
Ok(VisitRecursion::Continue)
})
// no way to fail as only Ok(VisitRecursion::Continue) is returned
.unwrap();

columns
fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
None
}

fn check_expr_is_col_equal_const(
exr: &phys_expr::BinaryExpr,
) -> Option<(phys_expr::Column, ScalarValue)> {
if Operator::Eq.ne(exr.op()) {
return None;
}
/// Use bloom filters to determine if we are sure this column can not
/// possibly contain `values`
///
/// The `contained` API returns false if the bloom filters knows that *ALL*
/// of the values in a column are not present.
fn contained(
&self,
column: &Column,
values: &HashSet<ScalarValue>,
) -> Option<BooleanArray> {
let sbbf = self.column_sbbf.get(column.name.as_str())?;

let left_any = exr.left().as_any();
let right_any = exr.right().as_any();
if let (Some(col), Some(liter)) = (
left_any.downcast_ref::<phys_expr::Column>(),
right_any.downcast_ref::<phys_expr::Literal>(),
) {
return Some((col.clone(), liter.value().clone()));
}
if let (Some(liter), Some(col)) = (
left_any.downcast_ref::<phys_expr::Literal>(),
right_any.downcast_ref::<phys_expr::Column>(),
) {
return Some((col.clone(), liter.value().clone()));
}
None
// Bloom filters are probabilistic data structures that can return false
// positives (i.e. it might return true even if the value is not
// present) however, the bloom filter will return `false` if the value is
// definitely not present.

let known_not_present = values
.iter()
.map(|value| match value {
ScalarValue::Utf8(Some(v)) => sbbf.check(&v.as_str()),
ScalarValue::Boolean(Some(v)) => sbbf.check(v),
ScalarValue::Float64(Some(v)) => sbbf.check(v),
ScalarValue::Float32(Some(v)) => sbbf.check(v),
ScalarValue::Int64(Some(v)) => sbbf.check(v),
ScalarValue::Int32(Some(v)) => sbbf.check(v),
ScalarValue::Int16(Some(v)) => sbbf.check(v),
ScalarValue::Int8(Some(v)) => sbbf.check(v),
_ => true,
})
// The row group doesn't contain any of the values if
// all the checks are false
.all(|v| !v);

let contains = if known_not_present {
Some(false)
} else {
// Given the bloom filter is probabilistic, we can't be sure that
// the row group actually contains the values. Return `None` to
// indicate this uncertainty
None
};

Some(BooleanArray::from(vec![contains]))
}
}

Expand Down Expand Up @@ -1367,6 +1301,7 @@ mod tests {

let metadata = builder.metadata().clone();
let pruned_row_group = prune_row_groups_by_bloom_filters(
pruning_predicate.schema(),
&mut builder,
row_groups,
metadata.row_groups(),
Expand Down