Skip to content

Commit

Permalink
chore(pruning): Support IS NOT NULL predicates in PruningPredicate (
Browse files Browse the repository at this point in the history
#9208)

* chore: add test cases for predicate is_null and is_not_null

* feat(pruning): support predicate build for is_not_null expression

* doc: add example in doc for `IS NOT NULL`

* chore: remove edit on cargo file

* chore: add `IS NOT NULL` test for row group pruning

chore: remove Debug derive

* chore: update comment null --> NULL

Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>

* chore: update comment

Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>

---------

Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
  • Loading branch information
appletreeisyellow and viirya authored Feb 14, 2024
1 parent 0c46d7f commit cc139c9
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 5 deletions.
49 changes: 44 additions & 5 deletions datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,13 +620,20 @@ mod tests {
ParquetStatistics::boolean(Some(false), Some(true), None, 1, false),
],
);
vec![rgm1, rgm2]
let rgm3 = get_row_group_meta_data(
&schema_descr,
vec![
ParquetStatistics::int32(Some(17), Some(30), None, 1, false),
ParquetStatistics::boolean(Some(false), Some(true), None, 0, false),
],
);
vec![rgm1, rgm2, rgm3]
}

#[test]
fn row_group_pruning_predicate_null_expr() {
use datafusion_expr::{col, lit};
// int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0
// c1 > 15 and IsNull(c2) => c1_max > 15 and c2_null_count > 0
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
Expand Down Expand Up @@ -657,7 +664,7 @@ mod tests {
use datafusion_expr::{col, lit};
// test row group predicate with an unknown (Null) expr
//
// int > 1 and bool = NULL => c1_max > 1 and null
// c1 > 15 and c2 = NULL => c1_max > 15 and NULL
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
Expand All @@ -672,7 +679,7 @@ mod tests {

let metrics = parquet_file_metrics();
// bool = NULL always evaluates to NULL (and thus will not
// pass predicates. Ideally these should both be false
// pass predicates. Ideally these should all be false
assert_eq!(
prune_row_groups_by_statistics(
&schema,
Expand All @@ -682,7 +689,39 @@ mod tests {
Some(&pruning_predicate),
&metrics
),
vec![1]
vec![1, 2]
);
}

#[test]
fn row_group_pruning_predicate_not_null_expr() {
use datafusion_expr::{col, lit};
// c1 > 15 and IsNotNull(c2) => c1_max > 15 and c2_null_count = 0
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
]));
let schema_descr = arrow_to_parquet_schema(&schema).unwrap();
let expr = col("c1").gt(lit(15)).and(col("c2").is_not_null());
let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let groups = gen_row_group_meta_data_for_pruning_predicate();

let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&groups,
None,
Some(&pruning_predicate),
&metrics
),
// The first row group was filtered out because c1_max is 10, which is smaller than 15.
// The second row group was filtered out because it contains null value on "c2".
// The third row group is kept because c1_max is 30, which is greater than 15 AND
// it does NOT contain any null value on "c2".
vec![2]
);
}

Expand Down
63 changes: 63 additions & 0 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ pub trait PruningStatistics {
/// `x < 5` | `x_max < 5`
/// `x = 5 AND y = 10` | `x_min <= 5 AND 5 <= x_max AND y_min <= 10 AND 10 <= y_max`
/// `x IS NULL` | `x_null_count > 0`
/// `x IS NOT NULL` | `x_null_count = 0`
///
/// ## Predicate Evaluation
/// The PruningPredicate works in two passes
Expand Down Expand Up @@ -1120,6 +1121,34 @@ fn build_is_null_column_expr(
}
}

/// Given an expression reference to `expr`, if `expr` is a column expression,
/// returns a pruning expression in terms of IsNotNull that will evaluate to true
/// if the column does NOT contain null, and false if it may contain null
fn build_is_not_null_column_expr(
expr: &Arc<dyn PhysicalExpr>,
schema: &Schema,
required_columns: &mut RequiredColumns,
) -> Option<Arc<dyn PhysicalExpr>> {
if let Some(col) = expr.as_any().downcast_ref::<phys_expr::Column>() {
let field = schema.field_with_name(col.name()).ok()?;

let null_count_field = &Field::new(field.name(), DataType::UInt64, true);
required_columns
.null_count_column_expr(col, expr, null_count_field)
.map(|null_count_column_expr| {
// IsNotNull(column) => null_count = 0
Arc::new(phys_expr::BinaryExpr::new(
null_count_column_expr,
Operator::Eq,
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
)) as _
})
.ok()
} else {
None
}
}

/// The maximum number of entries in an `InList` that might be rewritten into
/// an OR chain
const MAX_LIST_VALUE_SIZE_REWRITE: usize = 20;
Expand All @@ -1146,6 +1175,14 @@ fn build_predicate_expression(
return build_is_null_column_expr(is_null.arg(), schema, required_columns)
.unwrap_or(unhandled);
}
if let Some(is_not_null) = expr_any.downcast_ref::<phys_expr::IsNotNullExpr>() {
return build_is_not_null_column_expr(
is_not_null.arg(),
schema,
required_columns,
)
.unwrap_or(unhandled);
}
if let Some(col) = expr_any.downcast_ref::<phys_expr::Column>() {
return build_single_column_expr(col, schema, required_columns, false)
.unwrap_or(unhandled);
Expand Down Expand Up @@ -2052,6 +2089,32 @@ mod tests {
Ok(())
}

#[test]
fn row_group_predicate_is_null() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let expected_expr = "c1_null_count@0 > 0";

let expr = col("c1").is_null();
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);

Ok(())
}

#[test]
fn row_group_predicate_is_not_null() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let expected_expr = "c1_null_count@0 = 0";

let expr = col("c1").is_not_null();
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);

Ok(())
}

#[test]
fn row_group_predicate_required_columns() -> Result<()> {
let schema = Schema::new(vec![
Expand Down

0 comments on commit cc139c9

Please sign in to comment.