Skip to content

Commit ed37467

Browse files
Ted-Jiangwaynexia
andauthored
Prune columns are all null in ParquetExec by row_counts , handle IS NOT NULL (#9989)
* Prune columns are all null in ParquetExec by row_counts in pruning statistics * fix clippy * Update datafusion/core/tests/parquet/row_group_pruning.rs Co-authored-by: Ruihang Xia <waynestxia@gmail.com> * fix comment and support isNotNUll * add test * fix conflict --------- Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
1 parent 1ec65a4 commit ed37467

File tree

4 files changed

+128
-10
lines changed

4 files changed

+128
-10
lines changed

datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -338,8 +338,10 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
338338
scalar.to_array().ok()
339339
}
340340

341-
fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
342-
None
341+
fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
342+
let (c, _) = self.column(&column.name)?;
343+
let scalar = ScalarValue::UInt64(Some(c.num_values() as u64));
344+
scalar.to_array().ok()
343345
}
344346

345347
fn contained(
@@ -1022,15 +1024,17 @@ mod tests {
10221024
column_statistics: Vec<ParquetStatistics>,
10231025
) -> RowGroupMetaData {
10241026
let mut columns = vec![];
1027+
let number_row = 1000;
10251028
for (i, s) in column_statistics.iter().enumerate() {
10261029
let column = ColumnChunkMetaData::builder(schema_descr.column(i))
10271030
.set_statistics(s.clone())
1031+
.set_num_values(number_row)
10281032
.build()
10291033
.unwrap();
10301034
columns.push(column);
10311035
}
10321036
RowGroupMetaData::builder(schema_descr.clone())
1033-
.set_num_rows(1000)
1037+
.set_num_rows(number_row)
10341038
.set_total_byte_size(2000)
10351039
.set_column_metadata(columns)
10361040
.build()

datafusion/core/src/physical_optimizer/pruning.rs

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@ pub trait PruningStatistics {
335335
/// `x < 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_max < 5 END`
336336
/// `x = 5 AND y = 10` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_min <= 5 AND 5 <= x_max END AND CASE WHEN y_null_count = y_row_count THEN false ELSE y_min <= 10 AND 10 <= y_max END`
337337
/// `x IS NULL` | `x_null_count > 0`
338+
/// `x IS NOT NULL` | `x_null_count = 0`
338339
/// `CAST(x as int) = 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE CAST(x_min as int) <= 5 AND 5 <= CAST(x_max as int) END`
339340
///
340341
/// ## Predicate Evaluation
@@ -1239,10 +1240,15 @@ fn build_single_column_expr(
12391240
/// returns a pruning expression in terms of IsNull that will evaluate to true
12401241
/// if the column may contain null, and false if definitely does not
12411242
/// contain null.
1243+
/// If set `with_not` to true: which means is not null
1244+
/// Given an expression reference to `expr`, if `expr` is a column expression,
1245+
/// returns a pruning expression in terms of IsNotNull that will evaluate to true
1246+
/// if the column not contain any null, and false if definitely contain null.
12421247
fn build_is_null_column_expr(
12431248
expr: &Arc<dyn PhysicalExpr>,
12441249
schema: &Schema,
12451250
required_columns: &mut RequiredColumns,
1251+
with_not: bool,
12461252
) -> Option<Arc<dyn PhysicalExpr>> {
12471253
if let Some(col) = expr.as_any().downcast_ref::<phys_expr::Column>() {
12481254
let field = schema.field_with_name(col.name()).ok()?;
@@ -1251,12 +1257,21 @@ fn build_is_null_column_expr(
12511257
required_columns
12521258
.null_count_column_expr(col, expr, null_count_field)
12531259
.map(|null_count_column_expr| {
1254-
// IsNull(column) => null_count > 0
1255-
Arc::new(phys_expr::BinaryExpr::new(
1256-
null_count_column_expr,
1257-
Operator::Gt,
1258-
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
1259-
)) as _
1260+
if with_not {
1261+
// IsNotNull(column) => null_count = 0
1262+
Arc::new(phys_expr::BinaryExpr::new(
1263+
null_count_column_expr,
1264+
Operator::Eq,
1265+
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
1266+
)) as _
1267+
} else {
1268+
// IsNull(column) => null_count > 0
1269+
Arc::new(phys_expr::BinaryExpr::new(
1270+
null_count_column_expr,
1271+
Operator::Gt,
1272+
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
1273+
)) as _
1274+
}
12601275
})
12611276
.ok()
12621277
} else {
@@ -1287,9 +1302,18 @@ fn build_predicate_expression(
12871302
// predicate expression can only be a binary expression
12881303
let expr_any = expr.as_any();
12891304
if let Some(is_null) = expr_any.downcast_ref::<phys_expr::IsNullExpr>() {
1290-
return build_is_null_column_expr(is_null.arg(), schema, required_columns)
1305+
return build_is_null_column_expr(is_null.arg(), schema, required_columns, false)
12911306
.unwrap_or(unhandled);
12921307
}
1308+
if let Some(is_not_null) = expr_any.downcast_ref::<phys_expr::IsNotNullExpr>() {
1309+
return build_is_null_column_expr(
1310+
is_not_null.arg(),
1311+
schema,
1312+
required_columns,
1313+
true,
1314+
)
1315+
.unwrap_or(unhandled);
1316+
}
12931317
if let Some(col) = expr_any.downcast_ref::<phys_expr::Column>() {
12941318
return build_single_column_expr(col, schema, required_columns, false)
12951319
.unwrap_or(unhandled);

datafusion/core/tests/parquet/mod.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use arrow::{
2828
record_batch::RecordBatch,
2929
util::pretty::pretty_format_batches,
3030
};
31+
use arrow_array::new_null_array;
3132
use chrono::{Datelike, Duration, TimeDelta};
3233
use datafusion::{
3334
datasource::{physical_plan::ParquetExec, provider_as_source, TableProvider},
@@ -75,6 +76,7 @@ enum Scenario {
7576
DecimalLargePrecisionBloomFilter,
7677
ByteArray,
7778
PeriodsInColumnNames,
79+
WithNullValues,
7880
}
7981

8082
enum Unit {
@@ -630,6 +632,27 @@ fn make_names_batch(name: &str, service_name_values: Vec<&str>) -> RecordBatch {
630632
RecordBatch::try_new(schema, vec![Arc::new(name), Arc::new(service_name)]).unwrap()
631633
}
632634

635+
/// Return record batch with i8, i16, i32, and i64 sequences with all Null values
636+
fn make_all_null_values() -> RecordBatch {
637+
let schema = Arc::new(Schema::new(vec![
638+
Field::new("i8", DataType::Int8, true),
639+
Field::new("i16", DataType::Int16, true),
640+
Field::new("i32", DataType::Int32, true),
641+
Field::new("i64", DataType::Int64, true),
642+
]));
643+
644+
RecordBatch::try_new(
645+
schema,
646+
vec![
647+
new_null_array(&DataType::Int8, 5),
648+
new_null_array(&DataType::Int16, 5),
649+
new_null_array(&DataType::Int32, 5),
650+
new_null_array(&DataType::Int64, 5),
651+
],
652+
)
653+
.unwrap()
654+
}
655+
633656
fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
634657
match scenario {
635658
Scenario::Timestamps => {
@@ -799,6 +822,13 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
799822
),
800823
]
801824
}
825+
Scenario::WithNullValues => {
826+
vec![
827+
make_all_null_values(),
828+
make_int_batches(1, 6),
829+
make_all_null_values(),
830+
]
831+
}
802832
}
803833
}
804834

datafusion/core/tests/parquet/row_group_pruning.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1262,3 +1262,63 @@ async fn prune_periods_in_column_names() {
12621262
.test_row_group_prune()
12631263
.await;
12641264
}
1265+
1266+
#[tokio::test]
1267+
async fn test_row_group_with_null_values() {
1268+
// Three row groups:
1269+
// 1. all Null values
1270+
// 2. values from 1 to 5
1271+
// 3. all Null values
1272+
1273+
// After pruning, only row group 2 should be selected
1274+
RowGroupPruningTest::new()
1275+
.with_scenario(Scenario::WithNullValues)
1276+
.with_query("SELECT * FROM t WHERE \"i8\" <= 5")
1277+
.with_expected_errors(Some(0))
1278+
.with_matched_by_stats(Some(1))
1279+
.with_pruned_by_stats(Some(2))
1280+
.with_expected_rows(5)
1281+
.with_matched_by_bloom_filter(Some(0))
1282+
.with_pruned_by_bloom_filter(Some(0))
1283+
.test_row_group_prune()
1284+
.await;
1285+
1286+
// After pruning, only row group 1,3 should be selected
1287+
RowGroupPruningTest::new()
1288+
.with_scenario(Scenario::WithNullValues)
1289+
.with_query("SELECT * FROM t WHERE \"i8\" is Null")
1290+
.with_expected_errors(Some(0))
1291+
.with_matched_by_stats(Some(2))
1292+
.with_pruned_by_stats(Some(1))
1293+
.with_expected_rows(10)
1294+
.with_matched_by_bloom_filter(Some(0))
1295+
.with_pruned_by_bloom_filter(Some(0))
1296+
.test_row_group_prune()
1297+
.await;
1298+
1299+
// After pruning, only row group 2should be selected
1300+
RowGroupPruningTest::new()
1301+
.with_scenario(Scenario::WithNullValues)
1302+
.with_query("SELECT * FROM t WHERE \"i16\" is Not Null")
1303+
.with_expected_errors(Some(0))
1304+
.with_matched_by_stats(Some(1))
1305+
.with_pruned_by_stats(Some(2))
1306+
.with_expected_rows(5)
1307+
.with_matched_by_bloom_filter(Some(0))
1308+
.with_pruned_by_bloom_filter(Some(0))
1309+
.test_row_group_prune()
1310+
.await;
1311+
1312+
// All row groups will be pruned
1313+
RowGroupPruningTest::new()
1314+
.with_scenario(Scenario::WithNullValues)
1315+
.with_query("SELECT * FROM t WHERE \"i32\" > 7")
1316+
.with_expected_errors(Some(0))
1317+
.with_matched_by_stats(Some(0))
1318+
.with_pruned_by_stats(Some(3))
1319+
.with_expected_rows(0)
1320+
.with_matched_by_bloom_filter(Some(0))
1321+
.with_pruned_by_bloom_filter(Some(0))
1322+
.test_row_group_prune()
1323+
.await;
1324+
}

0 commit comments

Comments
 (0)