Skip to content

Commit

Permalink
fix: Parquet predicate pushdown for lit(_) != (#19246)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Oct 16, 2024
1 parent d89fdcd commit 109c404
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 22 deletions.
8 changes: 0 additions & 8 deletions crates/polars-expr/src/expressions/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,14 +525,6 @@ fn apply_multiple_elementwise<'a>(
impl StatsEvaluator for ApplyExpr {
fn should_read(&self, stats: &BatchStats) -> PolarsResult<bool> {
let read = self.should_read_impl(stats)?;
if ExecutionState::new().verbose() {
if read {
eprintln!("parquet file must be read, statistics not sufficient for predicate.")
} else {
eprintln!("parquet file can be skipped, the statistics were sufficient to apply the predicate.")
}
}

Ok(read)
}
}
Expand Down
6 changes: 1 addition & 5 deletions crates/polars-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ mod stats {
use ChunkCompareIneq as C;
match op {
Operator::Eq => apply_operator_stats_eq(min_max, literal),
Operator::NotEq => apply_operator_stats_eq(min_max, literal),
Operator::NotEq => apply_operator_stats_neq(min_max, literal),
Operator::Gt => {
// Literal is bigger than max value, selection needs all rows.
C::gt(literal, min_max).map(|ca| ca.any()).unwrap_or(false)
Expand Down Expand Up @@ -454,10 +454,6 @@ mod stats {

impl StatsEvaluator for BinaryExpr {
fn should_read(&self, stats: &BatchStats) -> PolarsResult<bool> {
if std::env::var("POLARS_NO_PARQUET_STATISTICS").is_ok() {
return Ok(true);
}

use Operator::*;
match (
self.left.as_stats_evaluator(),
Expand Down
33 changes: 27 additions & 6 deletions crates/polars-io/src/parquet/read/predicates.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use polars_core::config;
use polars_core::prelude::*;
use polars_parquet::read::statistics::{deserialize, Statistics};
use polars_parquet::read::RowGroupMetadata;
Expand Down Expand Up @@ -50,18 +51,38 @@ pub fn read_this_row_group(
md: &RowGroupMetadata,
schema: &ArrowSchema,
) -> PolarsResult<bool> {
if std::env::var("POLARS_NO_PARQUET_STATISTICS").is_ok() {
return Ok(true);
}

let mut should_read = true;

if let Some(pred) = predicate {
if let Some(pred) = pred.as_stats_evaluator() {
if let Some(stats) = collect_statistics(md, schema)? {
let should_read = pred.should_read(&stats);
let pred_result = pred.should_read(&stats);

// a parquet file may not have statistics of all columns
if matches!(should_read, Ok(false)) {
return Ok(false);
} else if !matches!(should_read, Err(PolarsError::ColumnNotFound(_))) {
let _ = should_read?;
match pred_result {
Err(PolarsError::ColumnNotFound(errstr)) => {
return Err(PolarsError::ColumnNotFound(errstr))
},
Ok(false) => should_read = false,
_ => {},
}
}
}

if config::verbose() {
if should_read {
eprintln!(
"parquet row group must be read, statistics not sufficient for predicate."
);
} else {
eprintln!("parquet row group can be skipped, the statistics were sufficient to apply the predicate.");
}
}
}
Ok(true)

Ok(should_read)
}
6 changes: 3 additions & 3 deletions py-polars/tests/unit/io/test_lazy_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,12 @@ def test_parquet_is_in_statistics(monkeypatch: Any, capfd: Any, tmp_path: Path)

captured = capfd.readouterr().err
assert (
"parquet file must be read, statistics not sufficient for predicate."
"parquet row group must be read, statistics not sufficient for predicate."
in captured
)
assert (
"parquet file can be skipped, the statistics were sufficient"
" to apply the predicate." in captured
"parquet row group can be skipped, the statistics were sufficient to apply the predicate."
in captured
)


Expand Down

0 comments on commit 109c404

Please sign in to comment.