From 44bdd05399311652ba176cbdf0198e3eeb33f2a5 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Mon, 20 May 2024 16:39:41 -0400 Subject: [PATCH] Address feedback Signed-off-by: Moritz Hoffmann --- src/operators/arrange/arrangement.rs | 7 ++++--- src/operators/reduce.rs | 22 ++++++++++++---------- src/trace/implementations/mod.rs | 22 +++++++++------------- src/trace/implementations/ord_neu.rs | 6 ++---- src/trace/implementations/rhh.rs | 3 +-- src/trace/mod.rs | 5 ++--- 6 files changed, 30 insertions(+), 35 deletions(-) diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 24a844eff..d8fb9720d 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -75,7 +75,8 @@ where use ::timely::dataflow::scopes::Child; use ::timely::progress::timestamp::Refines; -use timely::container::{PushContainer, PushInto}; +use timely::Container; +use timely::container::PushInto; impl Arranged where @@ -293,7 +294,7 @@ where F: Fn(T2::Val<'_>) -> V + 'static, T2::Diff: Abelian, T2::Batch: Batch, - ::Input: PushContainer, + ::Input: Container, ((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<::Input>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static, { @@ -313,7 +314,7 @@ where V: Data, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - ::Input: PushContainer, + ::Input: Container, ((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<::Input>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index b916ac65f..8711e66a0 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -5,7 +5,8 @@ //! to the key and the list of values. //! The function is expected to populate a list of output values. -use timely::container::{PushContainer, PushInto}; +use timely::Container; +use timely::container::PushInto; use crate::hashable::Hashable; use crate::{Data, ExchangeData, Collection}; use crate::difference::{Semigroup, Abelian}; @@ -313,7 +314,7 @@ where V: Data, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - ::Input: PushContainer, + ::Input: Container, ((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<::Input>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { @@ -450,12 +451,14 @@ where // TODO: It would be better if all updates went into one batch, but timely dataflow prevents // this as long as it requires that there is only one capability for each message. let mut buffers = Vec::<(G::Timestamp, Vec<(V, G::Timestamp, T2::Diff)>)>::new(); - let mut chains = Vec::new(); + let mut builders = Vec::new(); for cap in capabilities.iter() { buffers.push((cap.time().clone(), Vec::new())); - chains.push(Default::default()); + builders.push(T2::Builder::new()); } + let mut buffer = Default::default(); + // cursors for navigating input and output traces. let (mut source_cursor, source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor"); let source_storage = &source_storage; @@ -533,8 +536,9 @@ where for index in 0 .. buffers.len() { buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0)); for (val, time, diff) in buffers[index].1.drain(..) { - // TODO(antiguru): This is dumb. Need to stage data and then reveal it. - ((key.into_owned(), val), time, diff).push_into(&mut chains[index]); + ((key.into_owned(), val), time, diff).push_into(&mut buffer); + builders[index].push(&mut buffer); + buffer.clear(); } } } @@ -546,10 +550,8 @@ where output_lower.extend(lower_limit.borrow().iter().cloned()); // build and ship each batch (because only one capability per message). - for (index, mut chain) in chains.drain(..).enumerate() { - let mut builder = T2::Builder::new(); - // TODO(antiguru): Form actual chains. - builder.push(&mut chain); + for (index, builder) in builders.drain(..).enumerate() { + // Form the upper limit of the next batch, which includes all times greater // than the input batch, or the capabilities from i + 1 onward. output_upper.clear(); diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index 41373a25c..0f8985c6a 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -161,8 +161,8 @@ impl Update for Preferred where K: ToOwned + ?Sized, K::Owned: Ord+Clone+'static, - V: ToOwned + ?Sized + 'static, - V::Owned: Ord+Clone, + V: ToOwned + ?Sized, + V::Owned: Ord+Clone+'static, T: Ord+Lattice+timely::progress::Timestamp+Clone, R: Semigroup+Clone, { @@ -177,8 +177,8 @@ where K: Ord+ToOwned+PreferredContainer + ?Sized, K::Owned: Ord+Clone+'static, // for<'a> K::Container: BatchContainer = &'a K>, - V: Ord+ToOwned+PreferredContainer + ?Sized + 'static, - V::Owned: Ord+Clone, + V: Ord+ToOwned+PreferredContainer + ?Sized, + V::Owned: Ord+Clone+'static, T: Ord+Lattice+timely::progress::Timestamp+Clone, D: Semigroup+Clone, { @@ -192,6 +192,7 @@ where use std::convert::TryInto; use std::ops::Deref; use abomonation_derive::Abomonation; +use timely::Container; use timely::container::PushInto; use timely::progress::Timestamp; use crate::trace::cursor::MyTrait; @@ -364,9 +365,7 @@ impl BatchContainer for OffsetList { } /// Behavior to split an update into principal components. -pub trait BuilderInput { - /// The item to break apart. - type Item<'a>; +pub trait BuilderInput: Container { /// Key portion type Key<'a>: Ord; /// Value portion @@ -393,7 +392,6 @@ where T: Timestamp + Lattice + Clone + 'static, R: Semigroup + Clone + 'static, { - type Item<'a> = ((K, V), T, R); type Key<'a> = K; type Val<'a> = V; type Time = T; @@ -419,7 +417,6 @@ where T: Timestamp + Lattice + Columnation + Clone + 'static, R: Semigroup + Columnation + Clone + 'static, { - type Item<'a> = &'a ((K, V), T, R); type Key<'a> = &'a K; type Val<'a> = &'a V; type Time = T; @@ -442,12 +439,11 @@ impl BuilderInput> for TimelyStack<(( = &'a ((::Owned, ::Owned), T, R); type Key<'a> = &'a K::Owned; type Val<'a> = &'a V::Owned; type Time = T; diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index d96830575..ddc8a4409 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -69,7 +69,6 @@ mod val_batch { use std::marker::PhantomData; use abomonation_derive::Abomonation; - use timely::Container; use timely::container::PushInto; use timely::progress::{Antichain, frontier::AntichainRef}; @@ -544,7 +543,7 @@ mod val_batch { impl Builder for OrdValBuilder where L: Layout, - CI: Container + for<'a> BuilderInput = ::Item<'a>, Time=::Time, Diff=::Diff>, + CI: for<'a> BuilderInput::Time, Diff=::Diff>, for<'a> CI::Key<'a>: PushInto, for<'a> CI::Val<'a>: PushInto, { @@ -618,7 +617,6 @@ mod key_batch { use std::marker::PhantomData; use abomonation_derive::Abomonation; - use timely::Container; use timely::container::PushInto; use timely::progress::{Antichain, frontier::AntichainRef}; @@ -990,7 +988,7 @@ mod key_batch { impl Builder for OrdKeyBuilder where L: Layout, - CI: Container + for<'a> BuilderInput = ::Item<'a>, Time=::Time, Diff=::Diff>, + CI: for<'a> BuilderInput::Time, Diff=::Diff>, for<'a> CI::Key<'a>: PushInto, { diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index 67585f60d..c8315b0cb 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -80,7 +80,6 @@ mod val_batch { use std::convert::TryInto; use std::marker::PhantomData; use abomonation_derive::Abomonation; - use timely::Container; use timely::container::PushInto; use timely::progress::{Antichain, frontier::AntichainRef}; @@ -740,7 +739,7 @@ mod val_batch { where ::Key: Default + HashOrdered, // RhhValBatch: Batch::Key, Val=::Val, Time=::Time, Diff=::Diff>, - CI: Container + for<'a> BuilderInput = ::Item<'a>, Key<'a> = ::Key, Time=::Time, Diff=::Diff>, + CI: for<'a> BuilderInput = ::Key, Time=::Time, Diff=::Diff>, for<'a> CI::Val<'a>: PushInto, { type Input = CI; diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 9d5e6cbf8..00b72fc1d 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -336,10 +336,9 @@ pub trait Builder: Sized { /// /// They represent respectively the number of distinct `key`, `(key, val)`, and total updates. fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self; - /// Adds an element to the batch. + /// Adds a chunk of elements to the batch. /// - /// The default implementation uses `self.copy` with references to the owned arguments. - /// One should override it if the builder can take advantage of owned arguments. + /// Adds all elements from `chunk` to the builder and leaves `chunk` in an undefined state. fn push(&mut self, chunk: &mut Self::Input); /// Completes building and returns the batch. fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Self::Output;