Skip to content

Commit 35da826

Browse files
committed
add check to verify predicate is always present when initiating probe side scan
1 parent ef2c773 commit 35da826

File tree

2 files changed

+15
-7
lines changed
  • datafusion/core/tests/physical_optimizer/filter_pushdown

2 files changed

+15
-7
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1065,7 +1065,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
10651065
];
10661066
let probe_repartition = Arc::new(
10671067
RepartitionExec::try_new(
1068-
probe_scan,
1068+
Arc::clone(&probe_scan),
10691069
Partitioning::Hash(probe_hash_exprs, partition_count),
10701070
)
10711071
.unwrap(),
@@ -1199,6 +1199,13 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
11991199

12001200
let result = format!("{}", pretty_format_batches(&batches).unwrap());
12011201

1202+
let probe_scan_metrics = probe_scan.metrics().unwrap();
1203+
1204+
// The probe side had 4 rows, but after applying the dynamic filter only 2 rows should remain.
1205+
// The number of output rows from the probe side scan should stay consistent across executions.
1206+
// Issue: https://github.com/apache/datafusion/issues/17451
1207+
assert_eq!(probe_scan_metrics.output_rows().unwrap(), 2);
1208+
12021209
insta::assert_snapshot!(
12031210
result,
12041211
@r"

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use datafusion_datasource::{
2727
};
2828
use datafusion_physical_expr_common::physical_expr::fmt_sql;
2929
use datafusion_physical_optimizer::PhysicalOptimizerRule;
30+
use datafusion_physical_plan::filter::batch_filter;
3031
use datafusion_physical_plan::filter_pushdown::{FilterPushdownPhase, PushedDown};
3132
use datafusion_physical_plan::{
3233
displayable,
@@ -62,12 +63,6 @@ impl FileOpener for TestOpener {
6263
_file_meta: FileMeta,
6364
_file: PartitionedFile,
6465
) -> Result<FileOpenFuture> {
65-
if let Some(predicate) = &self.predicate {
66-
println!(
67-
"Predicate when calling open: {}",
68-
fmt_sql(predicate.as_ref())
69-
);
70-
}
7166
let mut batches = self.batches.clone();
7267
if let Some(batch_size) = self.batch_size {
7368
let batch = concat_batches(&batches[0].schema(), &batches)?;
@@ -84,6 +79,12 @@ impl FileOpener for TestOpener {
8479
let (mapper, projection) = factory.map_schema(&batches[0].schema()).unwrap();
8580
let mut new_batches = Vec::new();
8681
for batch in batches {
82+
let batch = if let Some(predicate) = &self.predicate {
83+
batch_filter(&batch, predicate)?
84+
} else {
85+
batch
86+
};
87+
8788
let batch = batch.project(&projection).unwrap();
8889
let batch = mapper.map_batch(batch).unwrap();
8990
new_batches.push(batch);

0 commit comments

Comments
 (0)