diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 13bbb915b..24a844eff 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -75,6 +75,7 @@ where use ::timely::dataflow::scopes::Child; use ::timely::progress::timestamp::Refines; +use timely::container::{PushContainer, PushInto}; impl Arranged where @@ -292,7 +293,8 @@ where F: Fn(T2::Val<'_>) -> V + 'static, T2::Diff: Abelian, T2::Batch: Batch, - T2::Builder: Builder, + ::Input: PushContainer, + ((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<::Input>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static, { self.reduce_core::<_,V,F,T2>(name, from, move |key, input, output, change| { @@ -311,7 +313,8 @@ where V: Data, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - T2::Builder: Builder, + ::Input: PushContainer, + ((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<::Input>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { use crate::operators::reduce::reduce_trace; diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 758ec8df3..65cdc0b4b 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -138,7 +138,7 @@ where F: Fn(Tr::Val<'_>) -> V + 'static, Tr::Time: TotalOrder+ExchangeData, Tr::Batch: Batch, - Tr::Builder: Builder, + Tr::Builder: Builder>, { let mut reader: Option> = None; @@ -282,9 +282,7 @@ where } // Must insert updates in (key, val, time) order. updates.sort(); - for update in updates.drain(..) { - builder.push(update); - } + builder.push(&mut updates); } let batch = builder.done(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum())); prev_frontier.clone_from(&upper); diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index dda549bca..b916ac65f 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -5,6 +5,7 @@ //! to the key and the list of values. //! The function is expected to populate a list of output values. +use timely::container::{PushContainer, PushInto}; use crate::hashable::Hashable; use crate::{Data, ExchangeData, Collection}; use crate::difference::{Semigroup, Abelian}; @@ -252,7 +253,7 @@ pub trait ReduceCore where F: Fn(T2::Val<'_>) -> V + 'static, T2::Diff: Abelian, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static, { self.reduce_core::<_,_,T2>(name, from, move |key, input, output, change| { @@ -274,7 +275,7 @@ pub trait ReduceCore where T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, ; } @@ -293,7 +294,7 @@ where F: Fn(T2::Val<'_>) -> V + 'static, T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { self.arrange_by_key_named(&format!("Arrange: {}", name)) @@ -312,7 +313,8 @@ where V: Data, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - T2::Builder: Builder, + ::Input: PushContainer, + ((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<::Input>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { let mut result_trace = None; @@ -448,10 +450,10 @@ where // TODO: It would be better if all updates went into one batch, but timely dataflow prevents // this as long as it requires that there is only one capability for each message. let mut buffers = Vec::<(G::Timestamp, Vec<(V, G::Timestamp, T2::Diff)>)>::new(); - let mut builders = Vec::new(); + let mut chains = Vec::new(); for cap in capabilities.iter() { buffers.push((cap.time().clone(), Vec::new())); - builders.push(T2::Builder::new()); + chains.push(Default::default()); } // cursors for navigating input and output traces. @@ -531,7 +533,8 @@ where for index in 0 .. buffers.len() { buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0)); for (val, time, diff) in buffers[index].1.drain(..) { - builders[index].push(((key.into_owned(), val), time, diff)); + // TODO(antiguru): This is dumb. Need to stage data and then reveal it. + ((key.into_owned(), val), time, diff).push_into(&mut chains[index]); } } } @@ -543,8 +546,10 @@ where output_lower.extend(lower_limit.borrow().iter().cloned()); // build and ship each batch (because only one capability per message). - for (index, builder) in builders.drain(..).enumerate() { - + for (index, mut chain) in chains.drain(..).enumerate() { + let mut builder = T2::Builder::new(); + // TODO(antiguru): Form actual chains. + builder.push(&mut chain); // Form the upper limit of the next batch, which includes all times greater // than the input batch, or the capabilities from i + 1 onward. output_upper.clear(); @@ -648,7 +653,7 @@ where where F: Fn(C2::Val<'_>) -> V, L: FnMut( - C1::Key<'a>, + C1::Key<'a>, &[(C1::Val<'a>, C1::Diff)], &mut Vec<(V, C2::Diff)>, &mut Vec<(V, C2::Diff)>, @@ -728,7 +733,7 @@ mod history_replay { where F: Fn(C2::Val<'_>) -> V, L: FnMut( - C1::Key<'a>, + C1::Key<'a>, &[(C1::Val<'a>, C1::Diff)], &mut Vec<(V, C2::Diff)>, &mut Vec<(V, C2::Diff)>, @@ -1020,7 +1025,7 @@ mod history_replay { new_interesting.push(next_time.clone()); debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&next_time))) } - + // Update `meet` to track the meet of each source of times. meet = None;//T::maximum(); diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 45e2a60f8..bb13cf650 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -282,7 +282,7 @@ where type Time = T; type Input = Vec<((K, V), T, R)>; type Chunk = Vec<((K, V), T, R)>; - type Output = ((K, V), T, R); + type Output = Vec<((K, V), T, R)>; fn accept(&mut self, container: RefOrMut, stash: &mut Vec) -> Vec { // Ensure `self.pending` has the desired capacity. We should never have a larger capacity @@ -497,8 +497,8 @@ where } let mut builder = B::with_capacity(keys, vals, upds); - for datum in chain.drain(..).flatten() { - builder.push(datum); + for mut chunk in chain.drain(..) { + builder.push(&mut chunk); } builder.done(lower.to_owned(), upper.to_owned(), since.to_owned()) diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index aed0039d8..265f2e649 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -67,7 +67,7 @@ where type Time = T; type Input = Vec<((K, V), T, R)>; type Chunk = TimelyStack<((K, V), T, R)>; - type Output = ((K, V), T, R); + type Output = TimelyStack<((K, V), T, R)>; fn accept(&mut self, container: RefOrMut, stash: &mut Vec) -> Vec { // Ensure `self.pending` has the desired capacity. We should never have a larger capacity @@ -290,11 +290,8 @@ where } } let mut builder = B::with_capacity(keys, vals, upds); - - for chunk in chain.drain(..) { - for datum in chunk.iter() { - builder.copy(datum); - } + for mut chunk in chain.drain(..) { + builder.push(&mut chunk); } builder.done(lower.to_owned(), upper.to_owned(), since.to_owned()) diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index cd0b8fd9a..41373a25c 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -138,7 +138,7 @@ where /// A type with a preferred container. /// -/// Examples include types that implement `Clone` who prefer +/// Examples include types that implement `Clone` who prefer pub trait PreferredContainer : ToOwned { /// The preferred container for the type. type Container: BatchContainer; @@ -192,10 +192,12 @@ where use std::convert::TryInto; use std::ops::Deref; use abomonation_derive::Abomonation; +use timely::container::PushInto; +use timely::progress::Timestamp; use crate::trace::cursor::MyTrait; /// A list of unsigned integers that uses `u32` elements as long as they are small enough, and switches to `u64` once they are not. -#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug, Abomonation)] +#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Abomonation)] pub struct OffsetList { /// Length of a prefix of zero elements. pub zero_prefix: usize, @@ -205,6 +207,12 @@ pub struct OffsetList { pub chonk: Vec, } +impl std::fmt::Debug for OffsetList { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_list().entries(self.into_iter()).finish() + } +} + impl OffsetList { /// Allocate a new list with a specified capacity. pub fn with_capacity(cap: usize) -> Self { @@ -222,7 +230,7 @@ impl OffsetList { else if self.chonk.is_empty() { if let Ok(smol) = offset.try_into() { self.smol.push(smol); - } + } else { self.chonk.push(offset.try_into().unwrap()) } @@ -249,6 +257,41 @@ impl OffsetList { } } +impl<'a> IntoIterator for &'a OffsetList { + type Item = usize; + type IntoIter = OffsetListIter<'a>; + + fn into_iter(self) -> Self::IntoIter { + OffsetListIter {list: self, index: 0 } + } +} + +/// An iterator for [`OffsetList`]. +pub struct OffsetListIter<'a> { + list: &'a OffsetList, + index: usize, +} + +impl<'a> Iterator for OffsetListIter<'a> { + type Item = usize; + + fn next(&mut self) -> Option { + if self.index < self.list.len() { + let res = Some(self.list.index(self.index)); + self.index += 1; + res + } else { + None + } + } +} + +impl PushInto for usize { + fn push_into(self, target: &mut OffsetList) { + target.push(self); + } +} + /// Helper struct to provide `MyTrait` for `Copy` types. #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Copy)] pub struct Wrapper(T); @@ -320,6 +363,109 @@ impl BatchContainer for OffsetList { } } +/// Behavior to split an update into principal components. +pub trait BuilderInput { + /// The item to break apart. + type Item<'a>; + /// Key portion + type Key<'a>: Ord; + /// Value portion + type Val<'a>: Ord; + /// Time + type Time; + /// Diff + type Diff; + + /// Split an item into separate parts. + fn into_parts<'a>(item: Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff); + + /// Test that the key equals a key in the layout's key container. + fn key_eq(this: &Self::Key<'_>, other: ::ReadItem<'_>) -> bool; + + /// Test that the value equals a key in the layout's value container. + fn val_eq(this: &Self::Val<'_>, other: ::ReadItem<'_>) -> bool; +} + +impl BuilderInput> for Vec<((K, V), T, R)> +where + K: Ord + Clone + 'static, + V: Ord + Clone + 'static, + T: Timestamp + Lattice + Clone + 'static, + R: Semigroup + Clone + 'static, +{ + type Item<'a> = ((K, V), T, R); + type Key<'a> = K; + type Val<'a> = V; + type Time = T; + type Diff = R; + + fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { + (key, val, time, diff) + } + + fn key_eq(this: &K, other: &K) -> bool { + this == other + } + + fn val_eq(this: &V, other: &V) -> bool { + this == other + } +} + +impl BuilderInput> for TimelyStack<((K, V), T, R)> +where + K: Ord + Columnation + Clone + 'static, + V: Ord + Columnation + Clone + 'static, + T: Timestamp + Lattice + Columnation + Clone + 'static, + R: Semigroup + Columnation + Clone + 'static, +{ + type Item<'a> = &'a ((K, V), T, R); + type Key<'a> = &'a K; + type Val<'a> = &'a V; + type Time = T; + type Diff = R; + + fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { + (key, val, time.clone(), diff.clone()) + } + + fn key_eq(this: &&K, other: &K) -> bool { + *this == other + } + + fn val_eq(this: &&V, other: &V) -> bool { + *this == other + } +} + +impl BuilderInput> for TimelyStack<((::Owned, ::Owned), T, R)> +where + K: Ord+ToOwned+PreferredContainer + ?Sized, + K::Owned: Columnation + Ord+Clone+'static, + V: Ord+ToOwned+PreferredContainer + ?Sized + 'static, + V::Owned: Columnation + Ord+Clone, + T: Columnation + Ord+Lattice+timely::progress::Timestamp+Clone, + R: Columnation + Semigroup+Clone, +{ + type Item<'a> = &'a ((::Owned, ::Owned), T, R); + type Key<'a> = &'a K::Owned; + type Val<'a> = &'a V::Owned; + type Time = T; + type Diff = R; + + fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { + (key, val, time.clone(), diff.clone()) + } + + fn key_eq(this: &&K::Owned, other: <::Container as BatchContainer>::ReadItem<'_>) -> bool { + other.equals(this) + } + + fn val_eq(this: &&V::Owned, other: <::Container as BatchContainer>::ReadItem<'_>) -> bool { + other.equals(this) + } +} + pub use self::containers::{BatchContainer, SliceContainer, SliceContainer2}; /// Containers for data that resemble `Vec`, with leaner implementations. @@ -498,6 +644,12 @@ pub mod containers { inner: Vec, } + impl PushInto> for &[B] { + fn push_into(self, target: &mut SliceContainer) { + target.copy(self) + } + } + impl BatchContainer for SliceContainer where B: Ord + Clone + Sized + 'static, @@ -572,6 +724,12 @@ pub mod containers { inner: Vec, } + impl PushInto> for &Vec { + fn push_into(self, target: &mut SliceContainer2) { + target.copy_push(self) + } + } + /// Welcome to GATs! pub struct Greetings<'a, B> { /// Text that decorates the data. @@ -581,11 +739,13 @@ pub mod containers { } impl<'a, B> Copy for Greetings<'a, B> { } - impl<'a, B> Clone for Greetings<'a, B> { + impl<'a, B> Clone for Greetings<'a, B> { fn clone(&self) -> Self { *self } } use std::cmp::Ordering; + use timely::container::PushInto; + impl<'a, 'b, B: Ord> PartialEq> for Greetings<'b, B> { fn eq(&self, other: &Greetings<'a, B>) -> bool { self.slice.eq(other.slice) @@ -606,10 +766,10 @@ pub mod containers { impl<'a, B: Ord + Clone> MyTrait<'a> for Greetings<'a, B> { type Owned = Vec; fn into_owned(self) -> Self::Owned { self.slice.to_vec() } - fn clone_onto(&self, other: &mut Self::Owned) { + fn clone_onto(&self, other: &mut Self::Owned) { self.slice.clone_into(other); } - fn compare(&self, other: &Self::Owned) -> std::cmp::Ordering { + fn compare(&self, other: &Self::Owned) -> std::cmp::Ordering { self.slice.cmp(&other[..]) } fn borrow_as(other: &'a Self::Owned) -> Self { diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index a5afee109..d96830575 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -9,6 +9,7 @@ //! and should consume fewer resources (computation and memory) when it applies. use std::rc::Rc; +use timely::container::columnation::{TimelyStack}; use crate::trace::implementations::spine_fueled::Spine; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger}; @@ -24,7 +25,7 @@ pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder}; pub type OrdValSpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, Vec<((K,V),T,R)>>>, >; // /// A trace implementation for empty values using a spine of ordered lists. // pub type OrdKeySpine = Spine>>>; @@ -33,14 +34,14 @@ pub type OrdValSpine = Spine< pub type ColValSpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, TimelyStack<((K,V),T,R)>>>, >; /// A trace implementation using a spine of ordered lists. pub type OrdKeySpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, Vec<((K,()),T,R)>>>, >; // /// A trace implementation for empty values using a spine of ordered lists. // pub type OrdKeySpine = Spine>>>; @@ -49,28 +50,31 @@ pub type OrdKeySpine = Spine< pub type ColKeySpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, TimelyStack<((K,()),T,R)>>>, >; /// A trace implementation backed by columnar storage. pub type PreferredSpine = Spine< Rc>>, MergeBatcher::Owned,::Owned),T,R)>,T>, - RcBuilder>>, + RcBuilder, TimelyStack<((::Owned,::Owned),T,R)>>>, >; // /// A trace implementation backed by columnar storage. // pub type ColKeySpine = Spine>>>; + mod val_batch { use std::marker::PhantomData; use abomonation_derive::Abomonation; + use timely::Container; + use timely::container::PushInto; use timely::progress::{Antichain, frontier::AntichainRef}; use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; - use crate::trace::implementations::BatchContainer; + use crate::trace::implementations::{BatchContainer, BuilderInput}; use crate::trace::cursor::MyTrait; use super::{Layout, Update}; @@ -148,7 +152,7 @@ mod val_batch { OrdValCursor { key_cursor: 0, val_cursor: 0, - phantom: std::marker::PhantomData, + phantom: PhantomData, } } fn len(&self) -> usize { @@ -189,7 +193,7 @@ mod val_batch { impl Merger> for OrdValMerger where - OrdValBatch: Batch::Time> + OrdValBatch: Batch::Time>, { fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: AntichainRef<::Time>) -> Self { @@ -498,7 +502,7 @@ mod val_batch { } /// A builder for creating layers from unsorted update tuples. - pub struct OrdValBuilder { + pub struct OrdValBuilder { result: OrdValStorage, singleton: Option<(::Time, ::Diff)>, /// Counts the number of singleton optimizations we performed. @@ -506,9 +510,10 @@ mod val_batch { /// This number allows us to correctly gauge the total number of updates reflected in a batch, /// even though `updates.len()` may be much shorter than this amount. singletons: usize, + _marker: PhantomData, } - impl OrdValBuilder { + impl OrdValBuilder { /// Pushes a single update, which may set `self.singleton` rather than push. /// /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`. @@ -536,9 +541,15 @@ mod val_batch { } } - impl Builder for OrdValBuilder { + impl Builder for OrdValBuilder + where + L: Layout, + CI: Container + for<'a> BuilderInput = ::Item<'a>, Time=::Time, Diff=::Diff>, + for<'a> CI::Key<'a>: PushInto, + for<'a> CI::Val<'a>: PushInto, + { - type Input = ((::Key, ::Val), ::Time, ::Diff); + type Input = CI; type Time = ::Time; type Output = OrdValBatch; @@ -554,62 +565,35 @@ mod val_batch { }, singleton: None, singletons: 0, + _marker: PhantomData, } } #[inline] - fn push(&mut self, ((key, val), time, diff): Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) { - self.push_update(time, diff); + fn push(&mut self, chunk: &mut Self::Input) { + for item in chunk.drain() { + let (key, val, time, diff) = CI::into_parts(item); + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) { + // Perhaps this is a continuation of an already received value. + if self.result.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) { + self.push_update(time, diff); + } else { + // New value; complete representation of prior value. + self.result.vals_offs.push(self.result.updates.len()); + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time, diff); + val.push_into(&mut self.result.vals); + } } else { - // New value; complete representation of prior value. + // New key; complete representation of prior key. self.result.vals_offs.push(self.result.updates.len()); if self.singleton.take().is_some() { self.singletons += 1; } + self.result.keys_offs.push(self.result.vals.len()); self.push_update(time, diff); - self.result.vals.push(val); - } - } else { - // New key; complete representation of prior key. - self.result.vals_offs.push(self.result.updates.len()); - if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); - self.push_update(time, diff); - self.result.vals.push(val); - self.result.keys.push(key); - } - } - - #[inline] - fn copy(&mut self, ((key, val), time, diff): &Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(val)).unwrap_or(false) { - // TODO: here we could look for repetition, and not push the update in that case. - // More logic (and state) would be required to correctly wrangle this. - self.push_update(time.clone(), diff.clone()); - } else { - // New value; complete representation of prior value. - self.result.vals_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time.clone(), diff.clone()); - self.result.vals.copy_push(val); + val.push_into(&mut self.result.vals); + key.push_into(&mut self.result.keys); } - } else { - // New key; complete representation of prior key. - self.result.vals_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); - self.push_update(time.clone(), diff.clone()); - self.result.vals.copy_push(val); - self.result.keys.copy_push(key); } } @@ -634,10 +618,12 @@ mod key_batch { use std::marker::PhantomData; use abomonation_derive::Abomonation; + use timely::Container; + use timely::container::PushInto; use timely::progress::{Antichain, frontier::AntichainRef}; use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; - use crate::trace::implementations::BatchContainer; + use crate::trace::implementations::{BatchContainer, BuilderInput}; use crate::trace::cursor::MyTrait; use super::{Layout, Update}; @@ -962,7 +948,7 @@ mod key_batch { } /// A builder for creating layers from unsorted update tuples. - pub struct OrdKeyBuilder { + pub struct OrdKeyBuilder { result: OrdKeyStorage, singleton: Option<(::Time, ::Diff)>, /// Counts the number of singleton optimizations we performed. @@ -970,9 +956,10 @@ mod key_batch { /// This number allows us to correctly gauge the total number of updates reflected in a batch, /// even though `updates.len()` may be much shorter than this amount. singletons: usize, + _marker: PhantomData, } - impl OrdKeyBuilder { + impl OrdKeyBuilder { /// Pushes a single update, which may set `self.singleton` rather than push. /// /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`. @@ -1000,9 +987,14 @@ mod key_batch { } } - impl Builder for OrdKeyBuilder { + impl Builder for OrdKeyBuilder + where + L: Layout, + CI: Container + for<'a> BuilderInput = ::Item<'a>, Time=::Time, Diff=::Diff>, + for<'a> CI::Key<'a>: PushInto, + { - type Input = ((::Key, ()), ::Time, ::Diff); + type Input = CI; type Time = ::Time; type Output = OrdKeyBatch; @@ -1016,38 +1008,25 @@ mod key_batch { }, singleton: None, singletons: 0, + _marker: PhantomData, } } #[inline] - fn push(&mut self, ((key, ()), time, diff): Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { - self.push_update(time, diff); - } else { - // New key; complete representation of prior key. - self.result.keys_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time, diff); - self.result.keys.push(key); - } - } - - #[inline] - fn copy(&mut self, ((key, ()), time, diff): &Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { - self.push_update(time.clone(), diff.clone()); - } else { - // New key; complete representation of prior key. - self.result.keys_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time.clone(), diff.clone()); - self.result.keys.copy_push(key); + fn push(&mut self, chunk: &mut Self::Input) { + for item in chunk.drain() { + let (key, _val, time, diff) = CI::into_parts(item); + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) { + self.push_update(time, diff); + } else { + // New key; complete representation of prior key. + self.result.keys_offs.push(self.result.updates.len()); + // Remove any pending singleton, and if it was set increment our count. + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time, diff); + key.push_into(&mut self.result.keys); + } } } diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index 088055507..67585f60d 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -9,6 +9,7 @@ use std::rc::Rc; use std::cmp::Ordering; use abomonation_derive::Abomonation; +use timely::container::columnation::TimelyStack; use crate::Hashable; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger}; @@ -24,7 +25,7 @@ use self::val_batch::{RhhValBatch, RhhValBuilder}; pub type VecSpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, Vec<((K,V),T,R)>>>, >; // /// A trace implementation for empty values using a spine of ordered lists. // pub type OrdKeySpine = Spine>>>; @@ -33,7 +34,7 @@ pub type VecSpine = Spine< pub type ColSpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, TimelyStack<((K,V),T,R)>>>, >; // /// A trace implementation backed by columnar storage. // pub type ColKeySpine = Spine>>>; @@ -68,7 +69,7 @@ where ::Output: PartialOrd { impl HashOrdered for HashWrapper { } -impl Hashable for HashWrapper { +impl Hashable for HashWrapper { type Output = T::Output; fn hashed(&self) -> Self::Output { self.inner.hashed() } } @@ -79,12 +80,14 @@ mod val_batch { use std::convert::TryInto; use std::marker::PhantomData; use abomonation_derive::Abomonation; + use timely::Container; + use timely::container::PushInto; use timely::progress::{Antichain, frontier::AntichainRef}; use crate::hashable::Hashable; use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; - use crate::trace::implementations::BatchContainer; + use crate::trace::implementations::{BatchContainer, BuilderInput}; use crate::trace::cursor::MyTrait; use super::{Layout, Update, HashOrdered}; @@ -688,7 +691,7 @@ mod val_batch { } /// A builder for creating layers from unsorted update tuples. - pub struct RhhValBuilder + pub struct RhhValBuilder where ::Key: Default + HashOrdered, { @@ -699,9 +702,10 @@ mod val_batch { /// This number allows us to correctly gauge the total number of updates reflected in a batch, /// even though `updates.len()` may be much shorter than this amount. singletons: usize, + _marker: PhantomData, } - impl RhhValBuilder + impl RhhValBuilder where ::Key: Default + HashOrdered, { @@ -732,12 +736,14 @@ mod val_batch { } } - impl Builder for RhhValBuilder + impl Builder for RhhValBuilder where ::Key: Default + HashOrdered, // RhhValBatch: Batch::Key, Val=::Val, Time=::Time, Diff=::Diff>, + CI: Container + for<'a> BuilderInput = ::Item<'a>, Key<'a> = ::Key, Time=::Time, Diff=::Diff>, + for<'a> CI::Val<'a>: PushInto, { - type Input = ((::Key, ::Val), ::Time, ::Diff); + type Input = CI; type Time = ::Time; type Output = RhhValBatch; @@ -765,64 +771,36 @@ mod val_batch { }, singleton: None, singletons: 0, + _marker: PhantomData, } } #[inline] - fn push(&mut self, ((key, val), time, diff): Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) { - self.push_update(time, diff); + fn push(&mut self, chunk: &mut Self::Input) { + for item in chunk.drain() { + let (key, val, time, diff) = CI::into_parts(item); + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) { + // Perhaps this is a continuation of an already received value. + if self.result.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) { + self.push_update(time, diff); + } else { + // New value; complete representation of prior value. + self.result.vals_offs.push(self.result.updates.len()); + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time, diff); + val.push_into(&mut self.result.vals); + } } else { - // New value; complete representation of prior value. + // New key; complete representation of prior key. self.result.vals_offs.push(self.result.updates.len()); if self.singleton.take().is_some() { self.singletons += 1; } + self.result.keys_offs.push(self.result.vals.len()); self.push_update(time, diff); - self.result.vals.push(val); + val.push_into(&mut self.result.vals); + // Insert the key, but with no specified offset. + self.result.insert_key(key.borrow(), None); } - } else { - // New key; complete representation of prior key. - self.result.vals_offs.push(self.result.updates.len()); - if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); - self.push_update(time, diff); - self.result.vals.push(val); - // Insert the key, but with no specified offset. - self.result.insert_key(key.borrow(), None); - } - } - - #[inline] - fn copy(&mut self, ((key, val), time, diff): &Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(val)).unwrap_or(false) { - // TODO: here we could look for repetition, and not push the update in that case. - // More logic (and state) would be required to correctly wrangle this. - self.push_update(time.clone(), diff.clone()); - } else { - // New value; complete representation of prior value. - self.result.vals_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time.clone(), diff.clone()); - self.result.vals.copy_push(val); - } - } else { - // New key; complete representation of prior key. - self.result.vals_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); - self.push_update(time.clone(), diff.clone()); - self.result.vals.copy_push(val); - // Insert the key, but with no specified offset. - self.result.insert_key(key, None); } } diff --git a/src/trace/mod.rs b/src/trace/mod.rs index afee7c22e..9d5e6cbf8 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -340,11 +340,7 @@ pub trait Builder: Sized { /// /// The default implementation uses `self.copy` with references to the owned arguments. /// One should override it if the builder can take advantage of owned arguments. - fn push(&mut self, element: Self::Input) { - self.copy(&element); - } - /// Adds an element to the batch. - fn copy(&mut self, element: &Self::Input); + fn push(&mut self, chunk: &mut Self::Input); /// Completes building and returns the batch. fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Self::Output; } @@ -454,8 +450,7 @@ pub mod rc_blanket_impls { type Time = B::Time; type Output = Rc; fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { RcBuilder { builder: B::with_capacity(keys, vals, upds) } } - fn push(&mut self, element: Self::Input) { self.builder.push(element) } - fn copy(&mut self, element: &Self::Input) { self.builder.copy(element) } + fn push(&mut self, input: &mut Self::Input) { self.builder.push(input) } fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Rc { Rc::new(self.builder.done(lower, upper, since)) } } @@ -561,8 +556,7 @@ pub mod abomonated_blanket_impls { type Time = B::Time; type Output = Abomonated>; fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { AbomonatedBuilder { builder: B::with_capacity(keys, vals, upds) } } - fn push(&mut self, element: Self::Input) { self.builder.push(element) } - fn copy(&mut self, element: &Self::Input) { self.builder.copy(element) } + fn push(&mut self, input: &mut Self::Input) { self.builder.push(input) } fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Self::Output { let batch = self.builder.done(lower, upper, since); let mut bytes = Vec::with_capacity(measure(&batch));