Skip to content

Commit 1d4d74b

Browse files
authored
Fix dynamic filter pushdown in HashJoinExec (apache#17201)
1 parent 3eceedc commit 1d4d74b

File tree

3 files changed

+110
-23
lines changed

3 files changed

+110
-23
lines changed

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ pub type PhysicalExprRef = Arc<dyn PhysicalExpr>;
6868
/// [`Expr`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html
6969
/// [`create_physical_expr`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/fn.create_physical_expr.html
7070
/// [`Column`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/expressions/struct.Column.html
71-
pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash {
71+
pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
7272
/// Returns the physical expression as [`Any`] so that it can be
7373
/// downcast to a specific implementation.
7474
fn as_any(&self) -> &dyn Any;

datafusion/physical-plan/src/joins/hash_join.rs

Lines changed: 77 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,6 @@ impl JoinLeftData {
329329
/// Note this structure includes a [`OnceAsync`] that is used to coordinate the
330330
/// loading of the left side with the processing in each output stream.
331331
/// Therefore it can not be [`Clone`]
332-
#[derive(Debug)]
333332
pub struct HashJoinExec {
334333
/// left (build) side which gets hashed
335334
pub left: Arc<dyn ExecutionPlan>,
@@ -350,7 +349,7 @@ pub struct HashJoinExec {
350349
///
351350
/// Each output stream waits on the `OnceAsync` to signal the completion of
352351
/// the hash table creation.
353-
left_fut: OnceAsync<JoinLeftData>,
352+
left_fut: Arc<OnceAsync<JoinLeftData>>,
354353
/// Shared the `RandomState` for the hashing algorithm
355354
random_state: RandomState,
356355
/// Partitioning mode to use
@@ -366,7 +365,29 @@ pub struct HashJoinExec {
366365
/// Cache holding plan properties like equivalences, output partitioning etc.
367366
cache: PlanProperties,
368367
/// Dynamic filter for pushing down to the probe side
369-
dynamic_filter: Arc<DynamicFilterPhysicalExpr>,
368+
dynamic_filter: Option<Arc<DynamicFilterPhysicalExpr>>,
369+
}
370+
371+
impl fmt::Debug for HashJoinExec {
372+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
373+
f.debug_struct("HashJoinExec")
374+
.field("left", &self.left)
375+
.field("right", &self.right)
376+
.field("on", &self.on)
377+
.field("filter", &self.filter)
378+
.field("join_type", &self.join_type)
379+
.field("join_schema", &self.join_schema)
380+
.field("left_fut", &self.left_fut)
381+
.field("random_state", &self.random_state)
382+
.field("mode", &self.mode)
383+
.field("metrics", &self.metrics)
384+
.field("projection", &self.projection)
385+
.field("column_indices", &self.column_indices)
386+
.field("null_equality", &self.null_equality)
387+
.field("cache", &self.cache)
388+
// Explicitly exclude dynamic_filter to avoid runtime state differences in tests
389+
.finish()
390+
}
370391
}
371392

372393
impl HashJoinExec {
@@ -413,8 +434,6 @@ impl HashJoinExec {
413434
projection.as_ref(),
414435
)?;
415436

416-
let dynamic_filter = Self::create_dynamic_filter(&on);
417-
418437
Ok(HashJoinExec {
419438
left,
420439
right,
@@ -430,12 +449,13 @@ impl HashJoinExec {
430449
column_indices,
431450
null_equality,
432451
cache,
433-
dynamic_filter,
452+
dynamic_filter: None,
434453
})
435454
}
436455

437456
fn create_dynamic_filter(on: &JoinOn) -> Arc<DynamicFilterPhysicalExpr> {
438-
// Extract the right-side keys from the `on` clauses
457+
// Extract the right-side keys (probe side keys) from the `on` clauses
458+
// Dynamic filter will be created from build side values (left side) and applied to probe side (right side)
439459
let right_keys: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect();
440460
// Initialize with a placeholder expression (true) that will be updated when the hash table is built
441461
Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true)))
@@ -686,11 +706,14 @@ impl DisplayAs for HashJoinExec {
686706
.map(|(c1, c2)| format!("({c1}, {c2})"))
687707
.collect::<Vec<String>>()
688708
.join(", ");
689-
let dynamic_filter_display = match self.dynamic_filter.current() {
690-
Ok(current) if current != lit(true) => {
691-
format!(", filter=[{current}]")
692-
}
693-
_ => "".to_string(),
709+
let dynamic_filter_display = match self.dynamic_filter.as_ref() {
710+
Some(dynamic_filter) => match dynamic_filter.current() {
711+
Ok(current) if current != lit(true) => {
712+
format!(", filter=[{current}]")
713+
}
714+
_ => "".to_string(),
715+
},
716+
None => "".to_string(),
694717
};
695718
write!(
696719
f,
@@ -794,7 +817,7 @@ impl ExecutionPlan for HashJoinExec {
794817
self: Arc<Self>,
795818
children: Vec<Arc<dyn ExecutionPlan>>,
796819
) -> Result<Arc<dyn ExecutionPlan>> {
797-
let mut new_join = HashJoinExec::try_new(
820+
let new_join = HashJoinExec::try_new(
798821
Arc::clone(&children[0]),
799822
Arc::clone(&children[1]),
800823
self.on.clone(),
@@ -804,8 +827,6 @@ impl ExecutionPlan for HashJoinExec {
804827
self.mode,
805828
self.null_equality,
806829
)?;
807-
// Preserve the dynamic filter if it exists
808-
new_join.dynamic_filter = Arc::clone(&self.dynamic_filter);
809830
Ok(Arc::new(new_join))
810831
}
811832

@@ -818,15 +839,15 @@ impl ExecutionPlan for HashJoinExec {
818839
filter: self.filter.clone(),
819840
join_type: self.join_type,
820841
join_schema: Arc::clone(&self.join_schema),
821-
left_fut: OnceAsync::default(),
842+
left_fut: Arc::new(OnceAsync::default()),
822843
random_state: self.random_state.clone(),
823844
mode: self.mode,
824845
metrics: ExecutionPlanMetricsSet::new(),
825846
projection: self.projection.clone(),
826847
column_indices: self.column_indices.clone(),
827848
null_equality: self.null_equality,
828849
cache: self.cache.clone(),
829-
dynamic_filter: Self::create_dynamic_filter(&self.on),
850+
dynamic_filter: None,
830851
}))
831852
}
832853

@@ -886,7 +907,8 @@ impl ExecutionPlan for HashJoinExec {
886907
need_produce_result_in_final(self.join_type),
887908
self.right().output_partitioning().partition_count(),
888909
enable_dynamic_filter_pushdown
889-
.then_some(Arc::clone(&self.dynamic_filter)),
910+
.then_some(self.dynamic_filter.clone())
911+
.flatten(),
890912
on_right.clone(),
891913
))
892914
})?,
@@ -906,7 +928,8 @@ impl ExecutionPlan for HashJoinExec {
906928
need_produce_result_in_final(self.join_type),
907929
1,
908930
enable_dynamic_filter_pushdown
909-
.then_some(Arc::clone(&self.dynamic_filter)),
931+
.then_some(self.dynamic_filter.clone())
932+
.flatten(),
910933
on_right.clone(),
911934
))
912935
}
@@ -1050,8 +1073,7 @@ impl ExecutionPlan for HashJoinExec {
10501073
&& config.optimizer.enable_dynamic_filter_pushdown
10511074
{
10521075
// Add actual dynamic filter to right side (probe side)
1053-
let dynamic_filter =
1054-
Arc::clone(&self.dynamic_filter) as Arc<dyn PhysicalExpr>;
1076+
let dynamic_filter = Self::create_dynamic_filter(&self.on);
10551077
right_child = right_child.with_self_filter(dynamic_filter);
10561078
}
10571079

@@ -1078,7 +1100,40 @@ impl ExecutionPlan for HashJoinExec {
10781100
child_pushdown_result,
10791101
));
10801102
}
1081-
Ok(FilterPushdownPropagation::if_any(child_pushdown_result))
1103+
1104+
let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone());
1105+
assert_eq!(child_pushdown_result.self_filters.len(), 2); // Should always be 2, we have 2 children
1106+
let right_child_self_filters = &child_pushdown_result.self_filters[1]; // We only push down filters to the right child
1107+
// We expect 0 or 1 self filters
1108+
if let Some(filter) = right_child_self_filters.first() {
1109+
// Note that we don't check PushdDownPredicate::discrimnant because even if nothing said
1110+
// "yes, I can fully evaluate this filter" things might still use it for statistics -> it's worth updating
1111+
let predicate = Arc::clone(&filter.predicate);
1112+
if let Ok(dynamic_filter) =
1113+
Arc::downcast::<DynamicFilterPhysicalExpr>(predicate)
1114+
{
1115+
// We successfully pushed down our self filter - we need to make a new node with the dynamic filter
1116+
let new_node = Arc::new(HashJoinExec {
1117+
left: Arc::clone(&self.left),
1118+
right: Arc::clone(&self.right),
1119+
on: self.on.clone(),
1120+
filter: self.filter.clone(),
1121+
join_type: self.join_type,
1122+
join_schema: Arc::clone(&self.join_schema),
1123+
left_fut: Arc::clone(&self.left_fut),
1124+
random_state: self.random_state.clone(),
1125+
mode: self.mode,
1126+
metrics: ExecutionPlanMetricsSet::new(),
1127+
projection: self.projection.clone(),
1128+
column_indices: self.column_indices.clone(),
1129+
null_equality: self.null_equality,
1130+
cache: self.cache.clone(),
1131+
dynamic_filter: Some(dynamic_filter),
1132+
});
1133+
result = result.with_updated_node(new_node as Arc<dyn ExecutionPlan>);
1134+
}
1135+
}
1136+
Ok(result)
10821137
}
10831138
}
10841139

datafusion/sqllogictest/test_files/push_down_filter.slt

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,5 +286,37 @@ explain select a from t where CAST(a AS string) = '0123';
286286
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8View) = 0123
287287

288288

289+
# Test dynamic filter pushdown with swapped join inputs (issue #17196)
290+
# Create tables with different sizes to force join input swapping
291+
statement ok
292+
copy (select i as k from generate_series(1, 100) t(i)) to 'test_files/scratch/push_down_filter/small_table.parquet';
293+
294+
statement ok
295+
copy (select i as k, i as v from generate_series(1, 1000) t(i)) to 'test_files/scratch/push_down_filter/large_table.parquet';
296+
297+
statement ok
298+
create external table small_table stored as parquet location 'test_files/scratch/push_down_filter/small_table.parquet';
299+
300+
statement ok
301+
create external table large_table stored as parquet location 'test_files/scratch/push_down_filter/large_table.parquet';
302+
303+
# Test that dynamic filter is applied to the correct table after join input swapping
304+
# The small_table should be the build side, large_table should be the probe side with dynamic filter
305+
query TT
306+
explain select * from small_table join large_table on small_table.k = large_table.k where large_table.v >= 50;
307+
----
308+
physical_plan
309+
01)CoalesceBatchesExec: target_batch_size=8192
310+
02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)]
311+
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/small_table.parquet]]}, projection=[k], file_type=parquet
312+
04)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
313+
05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND DynamicFilterPhysicalExpr [ true ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50, required_guarantees=[]
314+
315+
statement ok
316+
drop table small_table;
317+
318+
statement ok
319+
drop table large_table;
320+
289321
statement ok
290322
drop table t;

0 commit comments

Comments
 (0)