Skip to content

Commit 1369c45

Browse files
adriangbLiaCastaneda
authored andcommitted
Fix dynamic filter pushdown in HashJoinExec (apache#17201)
(cherry picked from commit 1d4d74b)
1 parent 1a16ba9 commit 1369c45

File tree

3 files changed

+110
-23
lines changed

3 files changed

+110
-23
lines changed

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ pub type PhysicalExprRef = Arc<dyn PhysicalExpr>;
6868
/// [`Expr`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html
6969
/// [`create_physical_expr`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/fn.create_physical_expr.html
7070
/// [`Column`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/expressions/struct.Column.html
71-
pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash {
71+
pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
7272
/// Returns the physical expression as [`Any`] so that it can be
7373
/// downcast to a specific implementation.
7474
fn as_any(&self) -> &dyn Any;

datafusion/physical-plan/src/joins/hash_join.rs

Lines changed: 77 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,6 @@ impl JoinLeftData {
329329
/// Note this structure includes a [`OnceAsync`] that is used to coordinate the
330330
/// loading of the left side with the processing in each output stream.
331331
/// Therefore it can not be [`Clone`]
332-
#[derive(Debug)]
333332
pub struct HashJoinExec {
334333
/// left (build) side which gets hashed
335334
pub left: Arc<dyn ExecutionPlan>,
@@ -350,7 +349,7 @@ pub struct HashJoinExec {
350349
///
351350
/// Each output stream waits on the `OnceAsync` to signal the completion of
352351
/// the hash table creation.
353-
left_fut: OnceAsync<JoinLeftData>,
352+
left_fut: Arc<OnceAsync<JoinLeftData>>,
354353
/// Shared the `RandomState` for the hashing algorithm
355354
random_state: RandomState,
356355
/// Partitioning mode to use
@@ -366,7 +365,29 @@ pub struct HashJoinExec {
366365
/// Cache holding plan properties like equivalences, output partitioning etc.
367366
cache: PlanProperties,
368367
/// Dynamic filter for pushing down to the probe side
369-
dynamic_filter: Arc<DynamicFilterPhysicalExpr>,
368+
dynamic_filter: Option<Arc<DynamicFilterPhysicalExpr>>,
369+
}
370+
371+
impl fmt::Debug for HashJoinExec {
372+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
373+
f.debug_struct("HashJoinExec")
374+
.field("left", &self.left)
375+
.field("right", &self.right)
376+
.field("on", &self.on)
377+
.field("filter", &self.filter)
378+
.field("join_type", &self.join_type)
379+
.field("join_schema", &self.join_schema)
380+
.field("left_fut", &self.left_fut)
381+
.field("random_state", &self.random_state)
382+
.field("mode", &self.mode)
383+
.field("metrics", &self.metrics)
384+
.field("projection", &self.projection)
385+
.field("column_indices", &self.column_indices)
386+
.field("null_equality", &self.null_equality)
387+
.field("cache", &self.cache)
388+
// Explicitly exclude dynamic_filter to avoid runtime state differences in tests
389+
.finish()
390+
}
370391
}
371392

372393
impl HashJoinExec {
@@ -413,8 +434,6 @@ impl HashJoinExec {
413434
projection.as_ref(),
414435
)?;
415436

416-
let dynamic_filter = Self::create_dynamic_filter(&on);
417-
418437
Ok(HashJoinExec {
419438
left,
420439
right,
@@ -430,12 +449,13 @@ impl HashJoinExec {
430449
column_indices,
431450
null_equality,
432451
cache,
433-
dynamic_filter,
452+
dynamic_filter: None,
434453
})
435454
}
436455

437456
fn create_dynamic_filter(on: &JoinOn) -> Arc<DynamicFilterPhysicalExpr> {
438-
// Extract the right-side keys from the `on` clauses
457+
// Extract the right-side keys (probe side keys) from the `on` clauses
458+
// Dynamic filter will be created from build side values (left side) and applied to probe side (right side)
439459
let right_keys: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect();
440460
// Initialize with a placeholder expression (true) that will be updated when the hash table is built
441461
Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true)))
@@ -686,11 +706,14 @@ impl DisplayAs for HashJoinExec {
686706
.map(|(c1, c2)| format!("({c1}, {c2})"))
687707
.collect::<Vec<String>>()
688708
.join(", ");
689-
let dynamic_filter_display = match self.dynamic_filter.current() {
690-
Ok(current) if current != lit(true) => {
691-
format!(", filter=[{current}]")
692-
}
693-
_ => "".to_string(),
709+
let dynamic_filter_display = match self.dynamic_filter.as_ref() {
710+
Some(dynamic_filter) => match dynamic_filter.current() {
711+
Ok(current) if current != lit(true) => {
712+
format!(", filter=[{current}]")
713+
}
714+
_ => "".to_string(),
715+
},
716+
None => "".to_string(),
694717
};
695718
write!(
696719
f,
@@ -787,7 +810,7 @@ impl ExecutionPlan for HashJoinExec {
787810
self: Arc<Self>,
788811
children: Vec<Arc<dyn ExecutionPlan>>,
789812
) -> Result<Arc<dyn ExecutionPlan>> {
790-
let mut new_join = HashJoinExec::try_new(
813+
let new_join = HashJoinExec::try_new(
791814
Arc::clone(&children[0]),
792815
Arc::clone(&children[1]),
793816
self.on.clone(),
@@ -797,8 +820,6 @@ impl ExecutionPlan for HashJoinExec {
797820
self.mode,
798821
self.null_equality,
799822
)?;
800-
// Preserve the dynamic filter if it exists
801-
new_join.dynamic_filter = Arc::clone(&self.dynamic_filter);
802823
Ok(Arc::new(new_join))
803824
}
804825

@@ -811,15 +832,15 @@ impl ExecutionPlan for HashJoinExec {
811832
filter: self.filter.clone(),
812833
join_type: self.join_type,
813834
join_schema: Arc::clone(&self.join_schema),
814-
left_fut: OnceAsync::default(),
835+
left_fut: Arc::new(OnceAsync::default()),
815836
random_state: self.random_state.clone(),
816837
mode: self.mode,
817838
metrics: ExecutionPlanMetricsSet::new(),
818839
projection: self.projection.clone(),
819840
column_indices: self.column_indices.clone(),
820841
null_equality: self.null_equality,
821842
cache: self.cache.clone(),
822-
dynamic_filter: Self::create_dynamic_filter(&self.on),
843+
dynamic_filter: None,
823844
}))
824845
}
825846

@@ -879,7 +900,8 @@ impl ExecutionPlan for HashJoinExec {
879900
need_produce_result_in_final(self.join_type),
880901
self.right().output_partitioning().partition_count(),
881902
enable_dynamic_filter_pushdown
882-
.then_some(Arc::clone(&self.dynamic_filter)),
903+
.then_some(self.dynamic_filter.clone())
904+
.flatten(),
883905
on_right.clone(),
884906
))
885907
})?,
@@ -899,7 +921,8 @@ impl ExecutionPlan for HashJoinExec {
899921
need_produce_result_in_final(self.join_type),
900922
1,
901923
enable_dynamic_filter_pushdown
902-
.then_some(Arc::clone(&self.dynamic_filter)),
924+
.then_some(self.dynamic_filter.clone())
925+
.flatten(),
903926
on_right.clone(),
904927
))
905928
}
@@ -1043,8 +1066,7 @@ impl ExecutionPlan for HashJoinExec {
10431066
&& config.optimizer.enable_dynamic_filter_pushdown
10441067
{
10451068
// Add actual dynamic filter to right side (probe side)
1046-
let dynamic_filter =
1047-
Arc::clone(&self.dynamic_filter) as Arc<dyn PhysicalExpr>;
1069+
let dynamic_filter = Self::create_dynamic_filter(&self.on);
10481070
right_child = right_child.with_self_filter(dynamic_filter);
10491071
}
10501072

@@ -1071,7 +1093,40 @@ impl ExecutionPlan for HashJoinExec {
10711093
child_pushdown_result,
10721094
));
10731095
}
1074-
Ok(FilterPushdownPropagation::if_any(child_pushdown_result))
1096+
1097+
let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone());
1098+
assert_eq!(child_pushdown_result.self_filters.len(), 2); // Should always be 2, we have 2 children
1099+
let right_child_self_filters = &child_pushdown_result.self_filters[1]; // We only push down filters to the right child
1100+
// We expect 0 or 1 self filters
1101+
if let Some(filter) = right_child_self_filters.first() {
1102+
// Note that we don't check PushdDownPredicate::discrimnant because even if nothing said
1103+
// "yes, I can fully evaluate this filter" things might still use it for statistics -> it's worth updating
1104+
let predicate = Arc::clone(&filter.predicate);
1105+
if let Ok(dynamic_filter) =
1106+
Arc::downcast::<DynamicFilterPhysicalExpr>(predicate)
1107+
{
1108+
// We successfully pushed down our self filter - we need to make a new node with the dynamic filter
1109+
let new_node = Arc::new(HashJoinExec {
1110+
left: Arc::clone(&self.left),
1111+
right: Arc::clone(&self.right),
1112+
on: self.on.clone(),
1113+
filter: self.filter.clone(),
1114+
join_type: self.join_type,
1115+
join_schema: Arc::clone(&self.join_schema),
1116+
left_fut: Arc::clone(&self.left_fut),
1117+
random_state: self.random_state.clone(),
1118+
mode: self.mode,
1119+
metrics: ExecutionPlanMetricsSet::new(),
1120+
projection: self.projection.clone(),
1121+
column_indices: self.column_indices.clone(),
1122+
null_equality: self.null_equality,
1123+
cache: self.cache.clone(),
1124+
dynamic_filter: Some(dynamic_filter),
1125+
});
1126+
result = result.with_updated_node(new_node as Arc<dyn ExecutionPlan>);
1127+
}
1128+
}
1129+
Ok(result)
10751130
}
10761131
}
10771132

datafusion/sqllogictest/test_files/push_down_filter.slt

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,5 +267,37 @@ explain select a from t where CAST(a AS string) = '0123';
267267
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8View) = 0123
268268

269269

270+
# Test dynamic filter pushdown with swapped join inputs (issue #17196)
271+
# Create tables with different sizes to force join input swapping
272+
statement ok
273+
copy (select i as k from generate_series(1, 100) t(i)) to 'test_files/scratch/push_down_filter/small_table.parquet';
274+
275+
statement ok
276+
copy (select i as k, i as v from generate_series(1, 1000) t(i)) to 'test_files/scratch/push_down_filter/large_table.parquet';
277+
278+
statement ok
279+
create external table small_table stored as parquet location 'test_files/scratch/push_down_filter/small_table.parquet';
280+
281+
statement ok
282+
create external table large_table stored as parquet location 'test_files/scratch/push_down_filter/large_table.parquet';
283+
284+
# Test that dynamic filter is applied to the correct table after join input swapping
285+
# The small_table should be the build side, large_table should be the probe side with dynamic filter
286+
query TT
287+
explain select * from small_table join large_table on small_table.k = large_table.k where large_table.v >= 50;
288+
----
289+
physical_plan
290+
01)CoalesceBatchesExec: target_batch_size=8192
291+
02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)]
292+
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/small_table.parquet]]}, projection=[k], file_type=parquet
293+
04)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
294+
05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND DynamicFilterPhysicalExpr [ true ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50, required_guarantees=[]
295+
296+
statement ok
297+
drop table small_table;
298+
299+
statement ok
300+
drop table large_table;
301+
270302
statement ok
271303
drop table t;

0 commit comments

Comments
 (0)