Skip to content

Commit dffd77b

Browse files
committed
Add field to DynamicPhysicalExpr to indicate its complete
1 parent e4bc514 commit dffd77b

File tree

3 files changed

+24
-1
lines changed

3 files changed

+24
-1
lines changed

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ struct Inner {
5757
/// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes.
5858
generation: u64,
5959
expr: Arc<dyn PhysicalExpr>,
60+
/// Flag indicating whether all updates have been received and the filter is complete.
61+
is_complete: bool,
6062
}
6163

6264
impl Inner {
@@ -66,6 +68,7 @@ impl Inner {
6668
// This is not currently used anywhere but it seems useful to have this simple distinction.
6769
generation: 1,
6870
expr,
71+
is_complete: false,
6972
}
7073
}
7174

@@ -207,10 +210,26 @@ impl DynamicFilterPhysicalExpr {
207210
*current = Inner {
208211
generation: current.generation + 1,
209212
expr: new_expr,
213+
is_complete: current.is_complete,
210214
};
211215
Ok(())
212216
}
213217

218+
/// Mark this dynamic filter as complete.
219+
///
220+
/// This signals that all expected updates have been received.
221+
pub fn mark_complete(&self) {
222+
let mut current = self.inner.write();
223+
current.is_complete = true;
224+
}
225+
226+
/// Check if this dynamic filter is complete.
227+
///
228+
/// Returns `true` if all expected updates have been received via [`Self::mark_complete`].
229+
pub fn is_complete(&self) -> bool {
230+
self.inner.read().is_complete
231+
}
232+
214233
fn render(
215234
&self,
216235
f: &mut std::fmt::Formatter<'_>,

datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,7 @@ impl SharedBoundsAccumulator {
300300
self.create_filter_from_partition_bounds(&inner.bounds)?;
301301
self.dynamic_filter.update(filter_expr)?;
302302
}
303+
self.dynamic_filter.mark_complete();
303304
}
304305

305306
Ok(())

datafusion/physical-plan/src/topk/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -591,10 +591,13 @@ impl TopK {
591591
common_sort_prefix_converter: _,
592592
common_sort_prefix: _,
593593
finished: _,
594-
filter: _,
594+
filter,
595595
} = self;
596596
let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop
597597

598+
// Mark the dynamic filter as complete now that TopK processing is finished.
599+
filter.read().expr().mark_complete();
600+
598601
// break into record batches as needed
599602
let mut batches = vec![];
600603
if let Some(mut batch) = heap.emit()? {

0 commit comments

Comments
 (0)