Skip to content

Commit c10ef72

Browse files
committed
Add test on empty build side and test on multiple waiters
1 parent c6a91b6 commit c10ef72

File tree

2 files changed

+76
-0
lines changed

2 files changed

+76
-0
lines changed

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -607,4 +607,32 @@ mod test {
607607
// wait_complete should return immediately
608608
dynamic_filter.wait_complete().await;
609609
}
610+
611+
#[tokio::test]
612+
async fn test_multiple_waiters() {
613+
let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(
614+
vec![],
615+
lit(42) as Arc<dyn PhysicalExpr>,
616+
));
617+
618+
// Create multiple tasks waiting for completion
619+
let df1 = Arc::clone(&dynamic_filter);
620+
let df2 = Arc::clone(&dynamic_filter);
621+
let df3 = Arc::clone(&dynamic_filter);
622+
623+
let waiter1 = tokio::spawn(async move { df1.wait_complete().await });
624+
let waiter2 = tokio::spawn(async move { df2.wait_complete().await });
625+
let waiter3 = tokio::spawn(async move { df3.wait_complete().await });
626+
627+
// Small delay to ensure all waiters are subscribed
628+
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
629+
630+
// Mark as complete - all waiters should be notified
631+
dynamic_filter.mark_complete();
632+
633+
// All waiters should complete successfully
634+
waiter1.await.unwrap();
635+
waiter2.await.unwrap();
636+
waiter3.await.unwrap();
637+
}
610638
}

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4545,4 +4545,52 @@ mod tests {
45454545

45464546
Ok(())
45474547
}
4548+
4549+
/// This test verifies that the dynamic filter is marked as complete even when the build side is empty.
4550+
#[tokio::test]
4551+
async fn test_hash_join_marks_filter_complete_empty_build_side() -> Result<()> {
4552+
let task_ctx = Arc::new(TaskContext::default());
4553+
// Empty left side (build side)
4554+
let left = build_table(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![]));
4555+
let right = build_table(
4556+
("a2", &vec![10, 20, 30]),
4557+
("b1", &vec![4, 5, 6]),
4558+
("c2", &vec![70, 80, 90]),
4559+
);
4560+
4561+
let on = vec![(
4562+
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4563+
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4564+
)];
4565+
4566+
// Create a dynamic filter manually
4567+
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
4568+
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
4569+
4570+
// Create HashJoinExec with the dynamic filter
4571+
let mut join = HashJoinExec::try_new(
4572+
left,
4573+
right,
4574+
on,
4575+
None,
4576+
&JoinType::Inner,
4577+
None,
4578+
PartitionMode::CollectLeft,
4579+
NullEquality::NullEqualsNothing,
4580+
)?;
4581+
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
4582+
filter: dynamic_filter,
4583+
bounds_accumulator: OnceLock::new(),
4584+
});
4585+
4586+
// Execute the join
4587+
let stream = join.execute(0, task_ctx)?;
4588+
let _batches = common::collect(stream).await?;
4589+
4590+
// Even with empty build side, the dynamic filter should be marked as complete
4591+
// wait_complete() should return immediately
4592+
dynamic_filter_clone.wait_complete().await;
4593+
4594+
Ok(())
4595+
}
45484596
}

0 commit comments

Comments
 (0)