Skip to content

Commit 28c5406

Browse files
committed
use watch instead
1 parent dffd77b commit 28c5406

File tree

3 files changed

+52
-13
lines changed

3 files changed

+52
-13
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/physical-expr/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ itertools = { workspace = true, features = ["use_std"] }
5555
parking_lot = { workspace = true }
5656
paste = "^1.0"
5757
petgraph = "0.8.3"
58+
tokio = { workspace = true }
5859

5960
[dev-dependencies]
6061
arrow = { workspace = true, features = ["test_utils"] }

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 50 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use parking_lot::RwLock;
1919
use std::{any::Any, fmt::Display, hash::Hash, sync::Arc};
20+
use tokio::sync::watch;
2021

2122
use crate::PhysicalExpr;
2223
use arrow::datatypes::{DataType, Schema};
@@ -44,6 +45,10 @@ pub struct DynamicFilterPhysicalExpr {
4445
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
4546
/// The source of dynamic filters.
4647
inner: Arc<RwLock<Inner>>,
48+
/// Broadcasts completion state changes to all waiters.
49+
completion_watch: watch::Sender<bool>,
50+
/// Broadcasts update changes to all waiters.
51+
update_watch: watch::Sender<u64>,
4752
/// For testing purposes track the data type and nullability to make sure they don't change.
4853
/// If they do, there's a bug in the implementation.
4954
/// But this can have overhead in production, so it's only included in our tests.
@@ -57,8 +62,6 @@ struct Inner {
5762
/// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes.
5863
generation: u64,
5964
expr: Arc<dyn PhysicalExpr>,
60-
/// Flag indicating whether all updates have been received and the filter is complete.
61-
is_complete: bool,
6265
}
6366

6467
impl Inner {
@@ -68,7 +71,6 @@ impl Inner {
6871
// This is not currently used anywhere but it seems useful to have this simple distinction.
6972
generation: 1,
7073
expr,
71-
is_complete: false,
7274
}
7375
}
7476

@@ -137,10 +139,14 @@ impl DynamicFilterPhysicalExpr {
137139
children: Vec<Arc<dyn PhysicalExpr>>,
138140
inner: Arc<dyn PhysicalExpr>,
139141
) -> Self {
142+
let (completion_watch, _) = watch::channel(false);
143+
let (update_watch, _) = watch::channel(1u64);
140144
Self {
141145
children,
142146
remapped_children: None, // Initially no remapped children
143147
inner: Arc::new(RwLock::new(Inner::new(inner))),
148+
completion_watch,
149+
update_watch,
144150
data_type: Arc::new(RwLock::new(None)),
145151
nullable: Arc::new(RwLock::new(None)),
146152
}
@@ -188,7 +194,7 @@ impl DynamicFilterPhysicalExpr {
188194
Self::remap_children(&self.children, self.remapped_children.as_ref(), expr)
189195
}
190196

191-
/// Update the current expression.
197+
/// Update the current expression and notify all waiters.
192198
/// Any children of this expression must be a subset of the original children
193199
/// passed to the constructor.
194200
/// This should be called e.g.:
@@ -207,27 +213,56 @@ impl DynamicFilterPhysicalExpr {
207213

208214
// Load the current inner, increment generation, and store the new one
209215
let mut current = self.inner.write();
216+
let new_generation = current.generation + 1;
210217
*current = Inner {
211-
generation: current.generation + 1,
218+
generation: new_generation,
212219
expr: new_expr,
213-
is_complete: current.is_complete,
214220
};
221+
drop(current); // Release the lock before broadcasting
222+
223+
// Broadcast the new generation to all waiters
224+
let _ = self.update_watch.send(new_generation);
215225
Ok(())
216226
}
217227

218-
/// Mark this dynamic filter as complete.
228+
/// Mark this dynamic filter as complete and broadcast to all waiters.
219229
///
220230
/// This signals that all expected updates have been received.
231+
/// Waiters using [`Self::wait_complete`] will be notified.
221232
pub fn mark_complete(&self) {
222-
let mut current = self.inner.write();
223-
current.is_complete = true;
233+
// Broadcast completion to all waiters
234+
let _ = self.completion_watch.send(true);
235+
}
236+
237+
/// Wait asynchronously for any update to this filter.
238+
///
239+
/// This method will return when [`Self::update`] is called.
240+
/// It does not guarantee that the filter is complete.
241+
pub async fn wait_update(&self) {
242+
let mut rx = self.update_watch.subscribe();
243+
// Wait for the generation to change
244+
let _ = rx.changed().await;
224245
}
225246

226-
/// Check if this dynamic filter is complete.
247+
/// Wait asynchronously until this dynamic filter is marked as complete.
248+
///
249+
/// This method returns immediately if the filter is already complete.
250+
/// Otherwise, it efficiently waits until [`Self::mark_complete`] is called.
251+
///
252+
/// Unlike [`Self::wait_update`], this method guarantees that when it returns,
253+
/// the filter is fully complete with no more updates expected.
254+
///
255+
/// # Example
227256
///
228-
/// Returns `true` if all expected updates have been received via [`Self::mark_complete`].
229-
pub fn is_complete(&self) -> bool {
230-
self.inner.read().is_complete
257+
/// ```ignore
258+
/// // wait for the final state
259+
/// dynamic_filter.wait_complete().await;
260+
/// // Now the filter is guaranteed to be complete
261+
/// ```
262+
pub async fn wait_complete(&self) {
263+
let mut rx = self.completion_watch.subscribe();
264+
// Wait until the completion flag becomes true
265+
let _ = rx.wait_for(|&complete| complete).await;
231266
}
232267

233268
fn render(
@@ -272,6 +307,8 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
272307
children: self.children.clone(),
273308
remapped_children: Some(children),
274309
inner: Arc::clone(&self.inner),
310+
completion_watch: self.completion_watch.clone(),
311+
update_watch: self.update_watch.clone(),
275312
data_type: Arc::clone(&self.data_type),
276313
nullable: Arc::clone(&self.nullable),
277314
}))

0 commit comments

Comments
 (0)