Skip to content

Commit 2673e8c

Browse files
committed
Drop implementation for Tracker
This allows the tracker to log reachability information when dropping dataflows. Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>
1 parent 036b4da commit 2673e8c

File tree

2 files changed

+41
-1
lines changed

2 files changed

+41
-1
lines changed

timely/src/progress/frontier.rs

+8
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,14 @@ impl<T> MutableAntichain<T> {
528528
.map(|td| td.1)
529529
.sum()
530530
}
531+
532+
pub(crate) fn updates(&mut self) -> impl Iterator<Item=&(T, i64)>
533+
where
534+
T: Clone + PartialOrder + Ord,
535+
{
536+
self.rebuild();
537+
self.updates.iter()
538+
}
531539
}
532540

533541
impl<T> Default for MutableAntichain<T> {

timely/src/progress/reachability.rs

+33-1
Original file line numberDiff line numberDiff line change
@@ -897,4 +897,36 @@ pub mod logging {
897897
impl From<TargetUpdate> for TrackerEvent {
898898
fn from(v: TargetUpdate) -> TrackerEvent { TrackerEvent::TargetUpdate(v) }
899899
}
900-
}
900+
}
901+
902+
impl<T: Timestamp> Drop for Tracker<T> {
903+
fn drop(&mut self) {
904+
let Some(logger) = &mut self.logger else { return; };
905+
for (index, per_operator) in self.per_operator.iter_mut().enumerate() {
906+
let target_changes = per_operator.targets
907+
.iter_mut()
908+
.enumerate()
909+
.flat_map(|(port, target)| {
910+
target.pointstamps
911+
.updates()
912+
.map(move |(time, diff)| (index, port, time.clone(), -diff))
913+
})
914+
.collect::<Vec<_>>();
915+
if !target_changes.is_empty() {
916+
logger.log_target_updates(Box::new(target_changes));
917+
}
918+
let source_changes = per_operator.sources
919+
.iter_mut()
920+
.enumerate()
921+
.flat_map(|(port, source)| {
922+
source.pointstamps
923+
.updates()
924+
.map(move |(time, diff)| (index, port, time.clone(), -diff))
925+
})
926+
.collect::<Vec<_>>();
927+
if !source_changes.is_empty() {
928+
logger.log_source_updates(Box::new(source_changes));
929+
}
930+
}
931+
}
932+
}

0 commit comments

Comments
 (0)