Skip to content

Commit 67cc93b

Browse files
committed
Add field to DynamicPhysicalExpr to indicate when the filter is complete or updated (apache#18799)
<!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes #. Dynamic filter pushdown in DataFusion currently lacks an API to determine when filters are "complete" (all contributing partitions have reported), this creates an ambiguity issue where it's impossible to differentiate between: 1. **Complete filter with no data**: Build side produced 0 rows, filter remains as placeholder `lit(true)`, no more updates coming 2. **Incomplete filter**: Filter is still being computed, updates are pending I think this could be especially useful when we want to make the filter updates progressively in the future. - Calls `mark_complete()` after barrier completes, regardless of whether bounds exist. - Exposes` is_complete() f`unction on the `DynamicFilterPhysicalExpr`. I didn't add any tests because the change is minimal , and comprehensive testing would require making the `DynamicFilterPhysicalExpr` public or running through the full optimizer pipeline. Exposing is_complete() function. (cherry picked from commit 7fa2a69)
1 parent f209f98 commit 67cc93b

File tree

6 files changed

+261
-7
lines changed

6 files changed

+261
-7
lines changed

Cargo.lock

Lines changed: 4 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/physical-expr/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ itertools = { workspace = true, features = ["use_std"] }
5252
log = { workspace = true }
5353
parking_lot = { workspace = true }
5454
paste = "^1.0"
55-
petgraph = "0.8.2"
55+
petgraph = "0.8.3"
56+
tokio = { workspace = true }
5657

5758
[dev-dependencies]
5859
arrow = { workspace = true, features = ["test_utils"] }
@@ -77,3 +78,6 @@ name = "is_null"
7778
[[bench]]
7879
harness = false
7980
name = "binary_op"
81+
82+
[package.metadata.cargo-machete]
83+
ignored = ["half"]

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use parking_lot::RwLock;
1919
use std::{any::Any, fmt::Display, hash::Hash, sync::Arc};
20+
use tokio::sync::watch;
2021

2122
use crate::PhysicalExpr;
2223
use arrow::datatypes::{DataType, Schema};
@@ -27,6 +28,24 @@ use datafusion_common::{
2728
use datafusion_expr::ColumnarValue;
2829
use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash};
2930

31+
/// State of a dynamic filter, tracking both updates and completion.
32+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33+
enum FilterState {
34+
/// Filter is in progress and may receive more updates.
35+
InProgress { generation: u64 },
36+
/// Filter is complete and will not receive further updates.
37+
Complete { generation: u64 },
38+
}
39+
40+
impl FilterState {
41+
fn generation(&self) -> u64 {
42+
match self {
43+
FilterState::InProgress { generation }
44+
| FilterState::Complete { generation } => *generation,
45+
}
46+
}
47+
}
48+
3049
/// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference to it.
3150
///
3251
/// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also
@@ -44,6 +63,8 @@ pub struct DynamicFilterPhysicalExpr {
4463
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
4564
/// The source of dynamic filters.
4665
inner: Arc<RwLock<Inner>>,
66+
/// Broadcasts filter state (updates and completion) to all waiters.
67+
state_watch: watch::Sender<FilterState>,
4768
/// For testing purposes track the data type and nullability to make sure they don't change.
4869
/// If they do, there's a bug in the implementation.
4970
/// But this can have overhead in production, so it's only included in our tests.
@@ -57,6 +78,10 @@ struct Inner {
5778
/// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes.
5879
generation: u64,
5980
expr: Arc<dyn PhysicalExpr>,
81+
/// Flag for quick synchronous check if filter is complete.
82+
/// This is redundant with the watch channel state, but allows us to return immediately
83+
/// from `wait_complete()` without subscribing if already complete.
84+
is_complete: bool,
6085
}
6186

6287
impl Inner {
@@ -66,6 +91,7 @@ impl Inner {
6691
// This is not currently used anywhere but it seems useful to have this simple distinction.
6792
generation: 1,
6893
expr,
94+
is_complete: false,
6995
}
7096
}
7197

@@ -135,10 +161,12 @@ impl DynamicFilterPhysicalExpr {
135161
children: Vec<Arc<dyn PhysicalExpr>>,
136162
inner: Arc<dyn PhysicalExpr>,
137163
) -> Self {
164+
let (state_watch, _) = watch::channel(FilterState::InProgress { generation: 1 });
138165
Self {
139166
children,
140167
remapped_children: None, // Initially no remapped children
141168
inner: Arc::new(RwLock::new(Inner::new(inner))),
169+
state_watch,
142170
data_type: Arc::new(RwLock::new(None)),
143171
nullable: Arc::new(RwLock::new(None)),
144172
}
@@ -181,7 +209,7 @@ impl DynamicFilterPhysicalExpr {
181209
Self::remap_children(&self.children, self.remapped_children.as_ref(), expr)
182210
}
183211

184-
/// Update the current expression.
212+
/// Update the current expression and notify all waiters.
185213
/// Any children of this expression must be a subset of the original children
186214
/// passed to the constructor.
187215
/// This should be called e.g.:
@@ -200,12 +228,67 @@ impl DynamicFilterPhysicalExpr {
200228

201229
// Load the current inner, increment generation, and store the new one
202230
let mut current = self.inner.write();
231+
let new_generation = current.generation + 1;
203232
*current = Inner {
204-
generation: current.generation + 1,
233+
generation: new_generation,
205234
expr: new_expr,
235+
is_complete: current.is_complete,
206236
};
237+
drop(current); // Release the lock before broadcasting
238+
239+
// Broadcast the new state to all waiters
240+
let _ = self.state_watch.send(FilterState::InProgress {
241+
generation: new_generation,
242+
});
207243
Ok(())
208244
}
245+
246+
/// Mark this dynamic filter as complete and broadcast to all waiters.
247+
///
248+
/// This signals that all expected updates have been received.
249+
/// Waiters using [`Self::wait_complete`] will be notified.
250+
pub fn mark_complete(&self) {
251+
let mut current = self.inner.write();
252+
let current_generation = current.generation;
253+
current.is_complete = true;
254+
drop(current);
255+
256+
// Broadcast completion to all waiters
257+
let _ = self.state_watch.send(FilterState::Complete {
258+
generation: current_generation,
259+
});
260+
}
261+
262+
/// Wait asynchronously for any update to this filter.
263+
///
264+
/// This method will return when [`Self::update`] is called and the generation increases.
265+
/// It does not guarantee that the filter is complete.
266+
pub async fn wait_update(&self) {
267+
let mut rx = self.state_watch.subscribe();
268+
// Get the current generation
269+
let current_gen = rx.borrow_and_update().generation();
270+
271+
// Wait until generation increases
272+
let _ = rx.wait_for(|state| state.generation() > current_gen).await;
273+
}
274+
275+
/// Wait asynchronously until this dynamic filter is marked as complete.
276+
///
277+
/// This method returns immediately if the filter is already complete.
278+
/// Otherwise, it waits until [`Self::mark_complete`] is called.
279+
///
280+
/// Unlike [`Self::wait_update`], this method guarantees that when it returns,
281+
/// the filter is fully complete with no more updates expected.
282+
pub async fn wait_complete(&self) {
283+
if self.inner.read().is_complete {
284+
return;
285+
}
286+
287+
let mut rx = self.state_watch.subscribe();
288+
let _ = rx
289+
.wait_for(|state| matches!(state, FilterState::Complete { .. }))
290+
.await;
291+
}
209292
}
210293

211294
impl PhysicalExpr for DynamicFilterPhysicalExpr {
@@ -229,6 +312,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
229312
children: self.children.clone(),
230313
remapped_children: Some(children),
231314
inner: Arc::clone(&self.inner),
315+
state_watch: self.state_watch.clone(),
232316
data_type: Arc::clone(&self.data_type),
233317
nullable: Arc::clone(&self.nullable),
234318
}))
@@ -488,4 +572,18 @@ mod test {
488572
"Expected err when evaluate is called after changing the expression."
489573
);
490574
}
575+
576+
#[tokio::test]
577+
async fn test_wait_complete_already_complete() {
578+
let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(
579+
vec![],
580+
lit(42) as Arc<dyn PhysicalExpr>,
581+
));
582+
583+
// Mark as complete immediately
584+
dynamic_filter.mark_complete();
585+
586+
// wait_complete should return immediately
587+
dynamic_filter.wait_complete().await;
588+
}
491589
}

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4500,4 +4500,103 @@ mod tests {
45004500
fn columns(schema: &Schema) -> Vec<String> {
45014501
schema.fields().iter().map(|f| f.name().clone()).collect()
45024502
}
4503+
4504+
/// This test verifies that the dynamic filter is marked as complete after HashJoinExec finishes building the hash table.
4505+
#[tokio::test]
4506+
async fn test_hash_join_marks_filter_complete() -> Result<()> {
4507+
let task_ctx = Arc::new(TaskContext::default());
4508+
let left = build_table(
4509+
("a1", &vec![1, 2, 3]),
4510+
("b1", &vec![4, 5, 6]),
4511+
("c1", &vec![7, 8, 9]),
4512+
);
4513+
let right = build_table(
4514+
("a2", &vec![10, 20, 30]),
4515+
("b1", &vec![4, 5, 6]),
4516+
("c2", &vec![70, 80, 90]),
4517+
);
4518+
4519+
let on = vec![(
4520+
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4521+
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4522+
)];
4523+
4524+
// Create a dynamic filter manually
4525+
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
4526+
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
4527+
4528+
// Create HashJoinExec with the dynamic filter
4529+
let mut join = HashJoinExec::try_new(
4530+
left,
4531+
right,
4532+
on,
4533+
None,
4534+
&JoinType::Inner,
4535+
None,
4536+
PartitionMode::CollectLeft,
4537+
NullEquality::NullEqualsNothing,
4538+
)?;
4539+
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
4540+
filter: dynamic_filter,
4541+
bounds_accumulator: OnceLock::new(),
4542+
});
4543+
4544+
// Execute the join
4545+
let stream = join.execute(0, task_ctx)?;
4546+
let _batches = common::collect(stream).await?;
4547+
4548+
// After the join completes, the dynamic filter should be marked as complete
4549+
// wait_complete() should return immediately
4550+
dynamic_filter_clone.wait_complete().await;
4551+
4552+
Ok(())
4553+
}
4554+
4555+
/// This test verifies that the dynamic filter is marked as complete even when the build side is empty.
4556+
#[tokio::test]
4557+
async fn test_hash_join_marks_filter_complete_empty_build_side() -> Result<()> {
4558+
let task_ctx = Arc::new(TaskContext::default());
4559+
// Empty left side (build side)
4560+
let left = build_table(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![]));
4561+
let right = build_table(
4562+
("a2", &vec![10, 20, 30]),
4563+
("b1", &vec![4, 5, 6]),
4564+
("c2", &vec![70, 80, 90]),
4565+
);
4566+
4567+
let on = vec![(
4568+
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
4569+
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
4570+
)];
4571+
4572+
// Create a dynamic filter manually
4573+
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
4574+
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
4575+
4576+
// Create HashJoinExec with the dynamic filter
4577+
let mut join = HashJoinExec::try_new(
4578+
left,
4579+
right,
4580+
on,
4581+
None,
4582+
&JoinType::Inner,
4583+
None,
4584+
PartitionMode::CollectLeft,
4585+
NullEquality::NullEqualsNothing,
4586+
)?;
4587+
join.dynamic_filter = Some(HashJoinExecDynamicFilter {
4588+
filter: dynamic_filter,
4589+
bounds_accumulator: OnceLock::new(),
4590+
});
4591+
4592+
// Execute the join
4593+
let stream = join.execute(0, task_ctx)?;
4594+
let _batches = common::collect(stream).await?;
4595+
4596+
// Even with empty build side, the dynamic filter should be marked as complete
4597+
// wait_complete() should return immediately
4598+
dynamic_filter_clone.wait_complete().await;
4599+
4600+
Ok(())
4601+
}
45034602
}

datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ impl SharedBoundsAccumulator {
283283
if completed == total_partitions && !inner.bounds.is_empty() {
284284
let filter_expr = self.create_filter_from_partition_bounds(&inner.bounds)?;
285285
self.dynamic_filter.update(filter_expr)?;
286+
self.dynamic_filter.mark_complete();
286287
}
287288

288289
Ok(())

datafusion/physical-plan/src/topk/mod.rs

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -589,10 +589,13 @@ impl TopK {
589589
common_sort_prefix_converter: _,
590590
common_sort_prefix: _,
591591
finished: _,
592-
filter: _,
592+
filter,
593593
} = self;
594594
let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop
595595

596+
// Mark the dynamic filter as complete now that TopK processing is finished.
597+
filter.read().expr().mark_complete();
598+
596599
// break into record batches as needed
597600
let mut batches = vec![];
598601
if let Some(mut batch) = heap.emit()? {
@@ -1197,4 +1200,52 @@ mod tests {
11971200

11981201
Ok(())
11991202
}
1203+
1204+
/// This test verifies that the dynamic filter is marked as complete after TopK processing finishes.
1205+
#[tokio::test]
1206+
async fn test_topk_marks_filter_complete() -> Result<()> {
1207+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1208+
1209+
let sort_expr = PhysicalSortExpr {
1210+
expr: col("a", schema.as_ref())?,
1211+
options: SortOptions::default(),
1212+
};
1213+
1214+
let full_expr = LexOrdering::from([sort_expr.clone()]);
1215+
let prefix = vec![sort_expr];
1216+
1217+
// Create a dummy runtime environment and metrics
1218+
let runtime = Arc::new(RuntimeEnv::default());
1219+
let metrics = ExecutionPlanMetricsSet::new();
1220+
1221+
// Create a dynamic filter that we'll check for completion
1222+
let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], lit(true)));
1223+
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
1224+
1225+
// Create a TopK instance
1226+
let mut topk = TopK::try_new(
1227+
0,
1228+
Arc::clone(&schema),
1229+
prefix,
1230+
full_expr,
1231+
2,
1232+
10,
1233+
runtime,
1234+
&metrics,
1235+
Arc::new(RwLock::new(TopKDynamicFilters::new(dynamic_filter))),
1236+
)?;
1237+
1238+
let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(1), Some(2)]));
1239+
let batch = RecordBatch::try_new(Arc::clone(&schema), vec![array])?;
1240+
topk.insert_batch(batch)?;
1241+
1242+
// Call emit to finish TopK processing
1243+
let _results: Vec<_> = topk.emit()?.try_collect().await?;
1244+
1245+
// After emit is called, the dynamic filter should be marked as complete
1246+
// wait_complete() should return immediately
1247+
dynamic_filter_clone.wait_complete().await;
1248+
1249+
Ok(())
1250+
}
12001251
}

0 commit comments

Comments
 (0)