From b07aa9eb050b0b70b4ac95791252c471fc73e053 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 24 Apr 2024 15:54:07 -0400 Subject: [PATCH] Undo some changes, rip out old columnated batcher Signed-off-by: Moritz Hoffmann --- src/operators/arrange/arrangement.rs | 4 +- src/operators/arrange/upsert.rs | 10 +- src/operators/reduce.rs | 21 +- src/trace/implementations/merge_batcher.rs | 62 ++- .../implementations/merge_batcher_col.rs | 416 +----------------- src/trace/implementations/ord_neu.rs | 168 ++++--- src/trace/implementations/rhh.rs | 112 ++--- src/trace/mod.rs | 35 +- 8 files changed, 237 insertions(+), 591 deletions(-) diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 3b14646e3..c22749a27 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -284,7 +284,7 @@ where F: Fn(T2::Val<'_>) -> V + 'static, T2::Diff: Abelian, T2::Batch: Batch, - T2::Builder: Builder>, + T2::Builder: Builder, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static, { self.reduce_core::<_,V,F,T2>(name, from, move |key, input, output, change| { @@ -303,7 +303,7 @@ where V: Data, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - T2::Builder: Builder>, + T2::Builder: Builder, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { use crate::operators::reduce::reduce_trace; diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 43c0e08f2..758ec8df3 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -138,7 +138,7 @@ where F: Fn(Tr::Val<'_>) -> V + 'static, Tr::Time: TotalOrder+ExchangeData, Tr::Batch: Batch, - Tr::Builder: Builder>, + Tr::Builder: Builder, { let mut reader: Option> = None; @@ -241,6 +241,7 @@ where // Prepare a cursor to the existing arrangement, and a batch builder for // new stuff that we add. let (mut trace_cursor, trace_storage) = reader_local.cursor(); + let mut builder = Tr::Builder::new(); for (key, mut list) in to_process.drain(..) { use trace::cursor::MyTrait; @@ -281,10 +282,11 @@ where } // Must insert updates in (key, val, time) order. updates.sort(); + for update in updates.drain(..) { + builder.push(update); + } } - let mut batches = vec![std::mem::take(&mut updates)]; - let batch = Tr::Builder::from_batches(&mut batches, prev_frontier.borrow(), upper.borrow(), Antichain::from_elem(G::Timestamp::minimum()).borrow()); - updates = batches.into_iter().next().unwrap_or_default(); + let batch = builder.done(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum())); prev_frontier.clone_from(&upper); // Communicate `batch` to the arrangement and the stream. diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 94254c569..dda549bca 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -252,7 +252,7 @@ pub trait ReduceCore where F: Fn(T2::Val<'_>) -> V + 'static, T2::Diff: Abelian, T2::Batch: Batch, - T2::Builder: Builder>, + T2::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static, { self.reduce_core::<_,_,T2>(name, from, move |key, input, output, change| { @@ -274,7 +274,7 @@ pub trait ReduceCore where T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - T2::Builder: Builder>, + T2::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, ; } @@ -293,7 +293,7 @@ where F: Fn(T2::Val<'_>) -> V + 'static, T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, T2::Batch: Batch, - T2::Builder: Builder>, + T2::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { self.arrange_by_key_named(&format!("Arrange: {}", name)) @@ -312,7 +312,7 @@ where V: Data, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - T2::Builder: Builder>, + T2::Builder: Builder, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { let mut result_trace = None; @@ -529,20 +529,9 @@ where // (ii) that the buffers are time-ordered, and (iii) that the builders accept // arbitrarily ordered times. for index in 0 .. buffers.len() { - // TODO: This doesn't reuse allocations for `update`. - let mut update = Vec::with_capacity(1024); buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0)); for (val, time, diff) in buffers[index].1.drain(..) { - update.push(((key.into_owned(), val), time, diff)); - if update.len() == update.capacity() { - let mut chain = vec![update]; - builders[index].push_batches(&mut chain); - update = chain.pop().unwrap_or_else(|| Vec::with_capacity(1024)); - update.clear(); - } - } - if !update.is_empty() { - builders[index].push_batches(&mut vec![update]); + builders[index].push(((key.into_owned(), val), time, diff)); } } } diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index e0e55a2f4..874f9dfd3 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -10,6 +10,7 @@ use timely::logging_core::Logger; use timely::progress::{frontier::Antichain, Timestamp}; use timely::progress::frontier::AntichainRef; +use crate::Data; use crate::consolidation::consolidate_updates; use crate::difference::Semigroup; use crate::logging::{BatcherEvent, DifferentialEvent}; @@ -42,7 +43,7 @@ where T: Timestamp, { type Input = M::Input; - type Output = M::Batch; + type Output = M::Output; type Time = T; fn new(logger: Option>, operator_id: usize) -> Self { @@ -95,7 +96,7 @@ where self.stash.clear(); - let seal = B::from_batches(&mut readied, self.lower.borrow(), upper.borrow(), Antichain::from_elem(T::minimum()).borrow()); + let seal = M::seal::(&mut readied, self.lower.borrow(), upper.borrow(), Antichain::from_elem(T::minimum()).borrow()); self.lower = upper; seal } @@ -185,6 +186,8 @@ pub trait Merger: Default { type Input; /// The internal representation of batches of data. type Batch: Container; + /// The output type + type Output; /// The type of time in frontiers to extract updates. type Time; /// Accept a fresh batch of input data. @@ -196,6 +199,9 @@ pub trait Merger: Default { /// Extract ready updates based on the `upper` frontier. fn extract(&mut self, merged: Vec, upper: AntichainRef, frontier: &mut Antichain, readied: &mut Vec, keep: &mut Vec, stash: &mut Vec); + /// Build from a chain + fn seal>(chain: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> B::Output; + /// Account size and allocation changes. Returns a tuple of (records, size, capacity, allocations). fn account(batch: &Self::Batch) -> (usize, usize, usize, usize); } @@ -241,12 +247,13 @@ impl VecMerger { } } -impl Merger for VecMerger<(D,T,R)> { +impl Merger for VecMerger<((K,V),T,R)> { type Time = T; - type Input = Vec<(D,T,R)>; - type Batch = Vec<(D,T,R)>; + type Input = Vec<((K,V),T,R)>; + type Batch = Vec<((K,V),T,R)>; + type Output = ((K,V), T, R); - fn accept(&mut self, batch: RefOrMut>, stash: &mut Vec) -> Vec> { + fn accept(&mut self, batch: RefOrMut, stash: &mut Vec) -> Vec { // `batch` is either a shared reference or an owned allocations. let mut owned = match batch { RefOrMut::Ref(vec) => { @@ -271,11 +278,11 @@ impl>) -> Vec> { + fn finish(&mut self, _stash: &mut Vec) -> Vec { vec![] } - fn merge(&mut self, list1: Vec>, list2: Vec>, output: &mut Vec>, stash: &mut Vec>) { + fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec) { let mut list1 = list1.into_iter(); let mut list2 = list2.into_iter(); let mut head1 = VecDeque::from(list1.next().unwrap_or_default()); @@ -343,7 +350,7 @@ impl>, upper: AntichainRef, frontier: &mut Antichain, readied: &mut Vec>, kept: &mut Vec>, stash: &mut Vec>) { + fn extract(&mut self, merged: Vec, upper: AntichainRef, frontier: &mut Antichain, readied: &mut Vec, kept: &mut Vec, stash: &mut Vec) { let mut keep = self.empty(stash); let mut ready = self.empty(stash); @@ -377,6 +384,43 @@ impl>(chain: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> B::Output { + let mut builder = { + let mut keys = 0; + let mut vals = 0; + let mut upds = 0; + let mut prev_keyval = None; + for buffer in chain.iter() { + for ((key, val), time, _) in buffer.iter() { + if !upper.less_equal(time) { + if let Some((p_key, p_val)) = prev_keyval { + if p_key != key { + keys += 1; + vals += 1; + } + else if p_val != val { + vals += 1; + } + upds += 1; + } else { + keys += 1; + vals += 1; + upds += 1; + } + prev_keyval = Some((key, val)); + } + } + } + B::with_capacity(keys, vals, upds) + }; + + for datum in chain.drain(..).flatten() { + builder.push(datum); + } + + builder.done(lower.to_owned(), upper.to_owned(), since.to_owned()) + } + fn account(batch: &Self::Batch) -> (usize, usize, usize, usize) { (batch.len(), 0, 0, 0) } diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index bc86e0b60..b0d8b2e1f 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -4,18 +4,14 @@ use std::cmp::Ordering; use timely::{Container, Data, PartialOrder}; use timely::communication::message::RefOrMut; use timely::container::columnation::{Columnation, TimelyStack}; -use timely::logging::WorkerIdentifier; -use timely::logging_core::Logger; -use timely::progress::{frontier::Antichain, Timestamp}; -use timely::progress::frontier::AntichainRef; +use timely::progress::frontier::{Antichain, AntichainRef}; use crate::consolidation::consolidate_updates; use crate::difference::Semigroup; -use crate::logging::{BatcherEvent, DifferentialEvent}; -use crate::trace::{Batcher, Builder}; +use crate::trace::Builder; use crate::trace::implementations::merge_batcher::Merger; -/// TODO +/// A merger for timely stacks pub struct ColumnationMerger { pending: Vec, } @@ -61,15 +57,17 @@ impl ColumnationMerger { } } -impl Merger for ColumnationMerger<(D, T, R)> +impl Merger for ColumnationMerger<((K,V), T, R)> where - D: Columnation + Ord + Data, + K: Columnation + Ord + Data, + V: Columnation + Ord + Data, T: Columnation + Ord + PartialOrder + Data, R: Columnation + Semigroup + 'static, { type Time = T; - type Input = Vec<(D,T,R)>; - type Batch = TimelyStack<(D,T,R)>; + type Input = Vec<((K,V),T,R)>; + type Batch = TimelyStack<((K,V),T,R)>; + type Output = ((K,V),T,R); fn accept(&mut self, batch: RefOrMut, stash: &mut Vec) -> Vec { // `batch` is either a shared reference or an owned allocations. @@ -217,84 +215,13 @@ where } } - fn account(batch: &Self::Batch) -> (usize, usize, usize, usize) { - let (mut size, mut capacity, mut allocations) = (0, 0, 0); - let cb = |siz, cap| { - size += siz; - capacity += cap; - allocations += 1; - }; - batch.heap_size(cb); - (batch.len(), size, capacity, allocations) - } -} - - -/// Creates batches from unordered tuples. -pub struct ColumnatedMergeBatcher -where - K: Columnation + 'static, - V: Columnation + 'static, - T: Columnation + 'static, - D: Columnation + 'static, -{ - sorter: MergeSorterColumnation<(K, V), T, D>, - lower: Antichain, - frontier: Antichain, -} - -impl Batcher for ColumnatedMergeBatcher -where - K: Columnation + Ord + Clone + 'static, - V: Columnation + Ord + Clone + 'static, - T: Columnation + Timestamp + 'static, - D: Columnation + Semigroup + 'static, -{ - type Input = Vec<((K,V),T,D)>; - type Output = Vec<((K,V),T,D)>; - type Time = T; - - fn new(logger: Option>, operator_id: usize) -> Self { - ColumnatedMergeBatcher { - sorter: MergeSorterColumnation::new(logger, operator_id), - frontier: Antichain::new(), - lower: Antichain::from_elem(::minimum()), - } - } - - #[inline] - fn push_batch(&mut self, batch: RefOrMut) { - // `batch` is either a shared reference or an owned allocations. - match batch { - RefOrMut::Ref(reference) => { - // This is a moment at which we could capture the allocations backing - // `batch` into a different form of region, rather than just cloning. - self.sorter.push(&mut reference.clone()); - }, - RefOrMut::Mut(reference) => { - self.sorter.push(reference); - } - } - } - - // Sealing a batch means finding those updates with times not greater or equal to any time - // in `upper`. All updates must have time greater or equal to the previously used `upper`, - // which we call `lower`, by assumption that after sealing a batcher we receive no more - // updates with times not greater or equal to `upper`. - #[inline] - fn seal>(&mut self, upper: Antichain) -> B::Output { - - let mut merged = Default::default(); - self.sorter.finish_into(&mut merged); - - // Determine the number of distinct keys, values, and updates, - // and form a builder pre-sized for these numbers. + fn seal>(chain: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> B::Output { let mut builder = { let mut keys = 0; let mut vals = 0; let mut upds = 0; let mut prev_keyval = None; - for buffer in merged.iter() { + for buffer in chain.iter() { for ((key, val), time, _) in buffer.iter() { if !upper.less_equal(time) { if let Some((p_key, p_val)) = prev_keyval { @@ -318,72 +245,26 @@ where B::with_capacity(keys, vals, upds) }; - let mut kept = Vec::new(); - let mut keep = TimelyStack::default(); - let mut readied = Vec::new(); - let mut ready = Vec::default(); - - self.frontier.clear(); - - for buffer in merged.drain(..) { - for datum @ ((_key, _val), time, _diff) in &buffer[..] { - if upper.less_equal(time) { - self.frontier.insert(time.clone()); - if keep.is_empty() { - if keep.capacity() != MergeSorterColumnation::<(K, V), T, D>::buffer_size() { - keep = self.sorter.empty(); - } - } else if keep.len() == keep.capacity() { - kept.push(keep); - keep = self.sorter.empty(); - } - keep.copy(datum); - } - else { - if ready.is_empty() { - if ready.capacity() != MergeSorterColumnation::<(K, V), T, D>::buffer_size() { - ready = Vec::with_capacity(MergeSorterColumnation::<(K, V), T, D>::buffer_size()); - } - } else if ready.len() == ready.capacity() { - readied.push(ready); - ready = Vec::with_capacity(MergeSorterColumnation::<(K, V), T, D>::buffer_size()); - } - ready.push(datum.clone()); - } - } - // Recycling buffer. - self.sorter.recycle(buffer); - } - - // Finish the kept data. - if !keep.is_empty() { - kept.push(keep); - } - if !kept.is_empty() { - self.sorter.push_list(kept); - } - - if !ready.is_empty() { - readied.push(ready); - } - if !readied.is_empty() { - builder.push_batches(&mut readied); + for datum in chain.iter().map(|ts| ts.iter()).flatten() { + builder.copy(datum); } - // Drain buffers (fast reclamation). - self.sorter.clear_stash(); - - let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(T::minimum())); - self.lower = upper; - seal + builder.done(lower.to_owned(), upper.to_owned(), since.to_owned()) } - /// The frontier of elements remaining after the most recent call to `self.seal`. - fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { - self.frontier.borrow() + fn account(batch: &Self::Batch) -> (usize, usize, usize, usize) { + let (mut size, mut capacity, mut allocations) = (0, 0, 0); + let cb = |siz, cap| { + size += siz; + capacity += cap; + allocations += 1; + }; + batch.heap_size(cb); + (batch.len(), size, capacity, allocations) } } + struct TimelyStackQueue { list: TimelyStack, head: usize, @@ -424,252 +305,3 @@ impl TimelyStackQueue { self.list[self.head..].iter() } } - -struct MergeSorterColumnation { - /// each power-of-two length list of allocations. Do not push/pop directly but use the corresponding functions. - queue: Vec>>, - stash: Vec>, - pending: Vec<(D, T, R)>, - logger: Option>, - operator_id: usize, -} - -impl MergeSorterColumnation { - - const BUFFER_SIZE_BYTES: usize = 64 << 10; - - /// Buffer size (number of elements) to use for new/empty buffers. - const fn buffer_size() -> usize { - let size = std::mem::size_of::<(D, T, R)>(); - if size == 0 { - Self::BUFFER_SIZE_BYTES - } else if size <= Self::BUFFER_SIZE_BYTES { - Self::BUFFER_SIZE_BYTES / size - } else { - 1 - } - } - - /// Buffer size for pending updates, currently 2 * [`Self::buffer_size`]. - const fn pending_buffer_size() -> usize { - Self::buffer_size() * 2 - } - - fn new(logger: Option>, operator_id: usize) -> Self { - Self { - logger, - operator_id, - queue: Vec::new(), - stash: Vec::new(), - pending: Vec::new(), - } - } - - fn empty(&mut self) -> TimelyStack<(D, T, R)> { - self.stash.pop().unwrap_or_else(|| TimelyStack::with_capacity(Self::buffer_size())) - } - - /// Remove all elements from the stash. - fn clear_stash(&mut self) { - self.stash.clear(); - } - - /// Insert an empty buffer into the stash. Panics if the buffer is not empty. - fn recycle(&mut self, mut buffer: TimelyStack<(D, T, R)>) { - if buffer.capacity() == Self::buffer_size() && self.stash.len() < 2 { - buffer.clear(); - self.stash.push(buffer); - } - } - - fn push(&mut self, batch: &mut Vec<(D, T, R)>) { - // Ensure `self.pending` has a capacity of `Self::pending_buffer_size`. - if self.pending.capacity() < Self::pending_buffer_size() { - self.pending.reserve(Self::pending_buffer_size() - self.pending.capacity()); - } - - while !batch.is_empty() { - self.pending.extend(batch.drain(..std::cmp::min(batch.len(), self.pending.capacity() - self.pending.len()))); - if self.pending.len() == self.pending.capacity() { - crate::consolidation::consolidate_updates(&mut self.pending); - if self.pending.len() > self.pending.capacity() / 2 { - // Flush if `self.pending` is more than half full after consolidation. - self.flush_pending(); - } - } - } - } - - /// Move all elements in `pending` into `queue`. The data in `pending` must be compacted and - /// sorted. After this function returns, `self.pending` is empty. - fn flush_pending(&mut self) { - if !self.pending.is_empty() { - let mut stack = self.empty(); - stack.reserve_items(self.pending.iter()); - for tuple in self.pending.drain(..) { - stack.copy(&tuple); - } - self.queue_push(vec![stack]); - while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() / 2) { - let list1 = self.queue_pop().unwrap(); - let list2 = self.queue_pop().unwrap(); - let merged = self.merge_by(list1, list2); - self.queue_push(merged); - } - } - } - - // This is awkward, because it isn't a power-of-two length any more, and we don't want - // to break it down to be so. - fn push_list(&mut self, list: Vec>) { - while self.queue.len() > 1 && self.queue[self.queue.len()-1].len() < list.len() { - let list1 = self.queue_pop().unwrap(); - let list2 = self.queue_pop().unwrap(); - let merged = self.merge_by(list1, list2); - self.queue_push(merged); - } - self.queue_push(list); - } - - fn finish_into(&mut self, target: &mut Vec>) { - crate::consolidation::consolidate_updates(&mut self.pending); - self.flush_pending(); - while self.queue.len() > 1 { - let list1 = self.queue_pop().unwrap(); - let list2 = self.queue_pop().unwrap(); - let merged = self.merge_by(list1, list2); - self.queue_push(merged); - } - - if let Some(mut last) = self.queue_pop() { - std::mem::swap(&mut last, target); - } - } - - // merges two sorted input lists into one sorted output list. - fn merge_by(&mut self, list1: Vec>, list2: Vec>) -> Vec> { - - // TODO: `list1` and `list2` get dropped; would be better to reuse? - let mut output = Vec::with_capacity(list1.len() + list2.len()); - let mut result = self.empty(); - - let mut list1 = list1.into_iter(); - let mut list2 = list2.into_iter(); - - let mut head1 = TimelyStackQueue::from(list1.next().unwrap_or_default()); - let mut head2 = TimelyStackQueue::from(list2.next().unwrap_or_default()); - - // while we have valid data in each input, merge. - while !head1.is_empty() && !head2.is_empty() { - - while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() { - - let cmp = { - let x = head1.peek(); - let y = head2.peek(); - (&x.0, &x.1).cmp(&(&y.0, &y.1)) - }; - match cmp { - Ordering::Less => { result.copy(head1.pop()); } - Ordering::Greater => { result.copy(head2.pop()); } - Ordering::Equal => { - let (data1, time1, diff1) = head1.pop(); - let (_data2, _time2, diff2) = head2.pop(); - let mut diff1 = diff1.clone(); - diff1.plus_equals(diff2); - if !diff1.is_zero() { - result.copy_destructured(data1, time1, &diff1); - } - } - } - } - - if result.capacity() == result.len() { - output.push(result); - result = self.empty(); - } - - if head1.is_empty() { - self.recycle(head1.done()); - head1 = TimelyStackQueue::from(list1.next().unwrap_or_default()); - } - if head2.is_empty() { - self.recycle(head2.done()); - head2 = TimelyStackQueue::from(list2.next().unwrap_or_default()); - } - } - - if result.len() > 0 { - output.push(result); - } else { - self.recycle(result); - } - - if !head1.is_empty() { - let mut result = self.empty(); - result.reserve_items(head1.iter()); - for item in head1.iter() { result.copy(item); } - output.push(result); - } - output.extend(list1); - - if !head2.is_empty() { - let mut result = self.empty(); - result.reserve_items(head2.iter()); - for item in head2.iter() { result.copy(item); } - output.push(result); - } - output.extend(list2); - - output - } -} - -impl MergeSorterColumnation { - /// Pop a batch from `self.queue` and account size changes. - #[inline] - fn queue_pop(&mut self) -> Option>> { - let batch = self.queue.pop(); - self.account(batch.iter().flatten(), -1); - batch - } - - /// Push a batch to `self.queue` and account size changes. - #[inline] - fn queue_push(&mut self, batch: Vec>) { - self.account(&batch, 1); - self.queue.push(batch); - } - - /// Account size changes. Only performs work if a logger exists. - /// - /// Calculate the size based on the [`TimelyStack`]s passed along, with each attribute - /// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff. - fn account<'a, I: IntoIterator>>(&self, items: I, diff: isize) { - if let Some(logger) = &self.logger { - let (mut records, mut siz, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize); - for stack in items { - records = records.saturating_add_unsigned(stack.len()); - stack.heap_size(|s, c| { - siz = siz.saturating_add_unsigned(s); - capacity = capacity.saturating_add_unsigned(c); - allocations += isize::from(c > 0); - }); - } - logger.log(BatcherEvent { - operator: self.operator_id, - records_diff: records * diff, - size_diff: siz * diff, - capacity_diff: capacity * diff, - allocations_diff: allocations * diff, - }) - } - } - -} - -impl Drop for MergeSorterColumnation { - fn drop(&mut self) { - while self.queue_pop().is_some() { } - } -} diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index 0ece74bc1..a5afee109 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -12,7 +12,7 @@ use std::rc::Rc; use crate::trace::implementations::spine_fueled::Spine; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger}; -use crate::trace::implementations::merge_batcher_col::ColumnatedMergeBatcher; +use crate::trace::implementations::merge_batcher_col::ColumnationMerger; use crate::trace::rc_blanket_impls::RcBuilder; use super::{Update, Layout, Vector, TStack, Preferred}; @@ -32,8 +32,7 @@ pub type OrdValSpine = Spine< /// A trace implementation backed by columnar storage. pub type ColValSpine = Spine< Rc>>, - // MergeBatcher, T>, - ColumnatedMergeBatcher, + MergeBatcher, T>, RcBuilder>>, >; @@ -49,16 +48,14 @@ pub type OrdKeySpine = Spine< /// A trace implementation backed by columnar storage. pub type ColKeySpine = Spine< Rc>>, - // MergeBatcher, T>, - ColumnatedMergeBatcher, + MergeBatcher, T>, RcBuilder>>, >; /// A trace implementation backed by columnar storage. pub type PreferredSpine = Spine< Rc>>, - // MergeBatcher::Owned,::Owned),T,R)>,T>, - ColumnatedMergeBatcher<::Owned,::Owned,T,R>, + MergeBatcher::Owned,::Owned),T,R)>,T>, RcBuilder>>, >; @@ -541,7 +538,7 @@ mod val_batch { impl Builder for OrdValBuilder { - type Input = Vec<((::Key, ::Val), ::Time, ::Diff)>; + type Input = ((::Key, ::Val), ::Time, ::Diff); type Time = ::Time; type Output = OrdValBatch; @@ -560,60 +557,59 @@ mod val_batch { } } - fn from_batches(batches: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> Self::Output { - let mut keys = 0; - let mut vals = 0; - let mut upds = 0; - let mut prev_keyval = None; - for buffer in batches.iter() { - for ((key, val), time, _) in buffer.iter() { - if !upper.less_equal(time) { - if let Some((p_key, p_val)) = prev_keyval { - if p_key != key { - keys += 1; - vals += 1; - } - else if p_val != val { - vals += 1; - } - upds += 1; - } else { - keys += 1; - vals += 1; - upds += 1; - } - prev_keyval = Some((key, val)); - } + #[inline] + fn push(&mut self, ((key, val), time, diff): Self::Input) { + + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { + // Perhaps this is a continuation of an already received value. + if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) { + self.push_update(time, diff); + } else { + // New value; complete representation of prior value. + self.result.vals_offs.push(self.result.updates.len()); + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time, diff); + self.result.vals.push(val); } + } else { + // New key; complete representation of prior key. + self.result.vals_offs.push(self.result.updates.len()); + if self.singleton.take().is_some() { self.singletons += 1; } + self.result.keys_offs.push(self.result.vals.len()); + self.push_update(time, diff); + self.result.vals.push(val); + self.result.keys.push(key); } - let mut new = Self::with_capacity(keys, vals, upds); - new.push_batches(batches); - new.done(lower.to_owned(), upper.to_owned(), since.to_owned()) } - fn push_batches(&mut self, batches: &mut Vec) { - for ((key, val), time, diff) in batches.iter_mut().map(|batch| batch.drain(..)).flatten() { - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) { - self.push_update(time, diff); - } else { - // New value; complete representation of prior value. - self.result.vals_offs.push(self.result.updates.len()); - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time, diff); - self.result.vals.push(val); - } + #[inline] + fn copy(&mut self, ((key, val), time, diff): &Self::Input) { + + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { + // Perhaps this is a continuation of an already received value. + if self.result.vals.last().map(|v| v.equals(val)).unwrap_or(false) { + // TODO: here we could look for repetition, and not push the update in that case. + // More logic (and state) would be required to correctly wrangle this. + self.push_update(time.clone(), diff.clone()); } else { - // New key; complete representation of prior key. + // New value; complete representation of prior value. self.result.vals_offs.push(self.result.updates.len()); + // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); - self.push_update(time, diff); - self.result.vals.push(val); - self.result.keys.push(key); + self.push_update(time.clone(), diff.clone()); + self.result.vals.copy_push(val); } + } else { + // New key; complete representation of prior key. + self.result.vals_offs.push(self.result.updates.len()); + // Remove any pending singleton, and if it was set increment our count. + if self.singleton.take().is_some() { self.singletons += 1; } + self.result.keys_offs.push(self.result.vals.len()); + self.push_update(time.clone(), diff.clone()); + self.result.vals.copy_push(val); + self.result.keys.copy_push(key); } } @@ -1006,7 +1002,7 @@ mod key_batch { impl Builder for OrdKeyBuilder { - type Input = Vec<((::Key, ()), ::Time, ::Diff)>; + type Input = ((::Key, ()), ::Time, ::Diff); type Time = ::Time; type Output = OrdKeyBatch; @@ -1023,45 +1019,35 @@ mod key_batch { } } - /// Build from batches - fn from_batches(batches: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> Self::Output { - let mut keys = 0; - let mut upds = 0; - let mut prev_key = None; - for buffer in batches.iter() { - for ((key, ()), time, _) in buffer.iter() { - if !upper.less_equal(time) { - if let Some(p_key) = prev_key { - if p_key != key { - keys += 1; - } - upds += 1; - } else { - keys += 1; - upds += 1; - } - prev_key = Some(key); - } - } + #[inline] + fn push(&mut self, ((key, ()), time, diff): Self::Input) { + + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { + self.push_update(time, diff); + } else { + // New key; complete representation of prior key. + self.result.keys_offs.push(self.result.updates.len()); + // Remove any pending singleton, and if it was set increment our count. + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time, diff); + self.result.keys.push(key); } - let mut new = Self::with_capacity(keys, 0, upds); - new.push_batches(batches); - new.done(lower.to_owned(), upper.to_owned(), since.to_owned()) } - fn push_batches(&mut self, batches: &mut Vec) { - for ((key, ()), time, diff) in batches.iter_mut().map(|batch| batch.drain(..)).flatten() { - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { - self.push_update(time, diff); - } else { - // New key; complete representation of prior key. - self.result.keys_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time, diff); - self.result.keys.push(key); - } + #[inline] + fn copy(&mut self, ((key, ()), time, diff): &Self::Input) { + + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { + self.push_update(time.clone(), diff.clone()); + } else { + // New key; complete representation of prior key. + self.result.keys_offs.push(self.result.updates.len()); + // Remove any pending singleton, and if it was set increment our count. + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time.clone(), diff.clone()); + self.result.keys.copy_push(key); } } diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index bceebde29..64cac268b 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -6,11 +6,14 @@ //! for example wrapped types that implement `Ord` that way. use std::rc::Rc; +use std::cmp::Ordering; + +use abomonation_derive::Abomonation; use crate::Hashable; -use crate::trace::implementations::spine_fueled::Spine; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger}; -use crate::trace::implementations::merge_batcher_col::ColumnatedMergeBatcher; +use crate::trace::implementations::merge_batcher_col::ColumnationMerger; +use crate::trace::implementations::spine_fueled::Spine; use crate::trace::rc_blanket_impls::RcBuilder; use super::{Update, Layout, Vector, TStack}; @@ -29,7 +32,7 @@ pub type VecSpine = Spine< /// A trace implementation backed by columnar storage. pub type ColSpine = Spine< Rc>>, - ColumnatedMergeBatcher, + MergeBatcher, T>, RcBuilder>>, >; // /// A trace implementation backed by columnar storage. @@ -47,9 +50,6 @@ pub struct HashWrapper { pub inner: T } -use std::cmp::Ordering; -use abomonation_derive::Abomonation; - impl PartialOrd for HashWrapper where ::Output: PartialOrd { fn partial_cmp(&self, other: &Self) -> Option { @@ -729,41 +729,10 @@ mod val_batch { ::Key: Default + HashOrdered, // RhhValBatch: Batch::Key, Val=::Val, Time=::Time, Diff=::Diff>, { - type Input = Vec<((::Key, ::Val), ::Time, ::Diff)>; + type Input = ((::Key, ::Val), ::Time, ::Diff); type Time = ::Time; type Output = RhhValBatch; - fn from_batches(batches: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> Self::Output { - let mut keys = 0; - let mut vals = 0; - let mut upds = 0; - let mut prev_keyval = None; - for buffer in batches.iter() { - for ((key, val), time, _) in buffer.iter() { - if !upper.less_equal(time) { - if let Some((p_key, p_val)) = prev_keyval { - if p_key != key { - keys += 1; - vals += 1; - } - else if p_val != val { - vals += 1; - } - upds += 1; - } else { - keys += 1; - vals += 1; - upds += 1; - } - prev_keyval = Some((key, val)); - } - } - } - let mut new = Self::with_capacity(keys, vals, upds); - new.push_batches(batches); - new.done(lower.to_owned(), upper.to_owned(), since.to_owned()) - } - fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { // Double the capacity for RHH; probably excessive. @@ -791,30 +760,61 @@ mod val_batch { } } - fn push_batches(&mut self, batches: &mut Vec) { - for ((key, val), time, diff) in batches.iter_mut().map(|batch| batch.drain(..)).flatten() { - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) { - self.push_update(time, diff); - } else { - // New value; complete representation of prior value. - self.result.vals_offs.push(self.result.updates.len()); - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time, diff); - self.result.vals.push(val); - } + #[inline] + fn push(&mut self, ((key, val), time, diff): Self::Input) { + + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { + // Perhaps this is a continuation of an already received value. + if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) { + self.push_update(time, diff); } else { - // New key; complete representation of prior key. + // New value; complete representation of prior value. self.result.vals_offs.push(self.result.updates.len()); if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); self.push_update(time, diff); self.result.vals.push(val); - // Insert the key, but with no specified offset. - self.result.insert_key(key.borrow(), None); } + } else { + // New key; complete representation of prior key. + self.result.vals_offs.push(self.result.updates.len()); + if self.singleton.take().is_some() { self.singletons += 1; } + self.result.keys_offs.push(self.result.vals.len()); + self.push_update(time, diff); + self.result.vals.push(val); + // Insert the key, but with no specified offset. + self.result.insert_key(key.borrow(), None); + } + } + + #[inline] + fn copy(&mut self, ((key, val), time, diff): &Self::Input) { + + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { + // Perhaps this is a continuation of an already received value. + if self.result.vals.last().map(|v| v.equals(val)).unwrap_or(false) { + // TODO: here we could look for repetition, and not push the update in that case. + // More logic (and state) would be required to correctly wrangle this. + self.push_update(time.clone(), diff.clone()); + } else { + // New value; complete representation of prior value. + self.result.vals_offs.push(self.result.updates.len()); + // Remove any pending singleton, and if it was set increment our count. + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time.clone(), diff.clone()); + self.result.vals.copy_push(val); + } + } else { + // New key; complete representation of prior key. + self.result.vals_offs.push(self.result.updates.len()); + // Remove any pending singleton, and if it was set increment our count. + if self.singleton.take().is_some() { self.singletons += 1; } + self.result.keys_offs.push(self.result.vals.len()); + self.push_update(time.clone(), diff.clone()); + self.result.vals.copy_push(val); + // Insert the key, but with no specified offset. + self.result.insert_key(key, None); } } diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 095e3fc84..6887283a2 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -320,16 +320,13 @@ pub trait Batcher { /// Functionality for building batches from ordered update sequences. pub trait Builder: Sized { - /// Input type. + /// Input item type. type Input; /// Timestamp type. type Time: Timestamp; /// Output batch type. type Output; - /// Build from batches - fn from_batches(batches: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> Self::Output; - /// Allocates an empty builder. /// /// Ideally we deprecate this and insist all non-trivial building happens via `with_capacity()`. @@ -338,14 +335,17 @@ pub trait Builder: Sized { /// Allocates an empty builder with capacity for the specified keys, values, and updates. /// /// They represent respectively the number of distinct `key`, `(key, val)`, and total updates. - // #[deprecated] fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self; - /// Adds elements in sorted order to the batch. - /// TODO: Refine the `batches` parameter to allow allocation reuse. - // #[deprecated] - fn push_batches(&mut self, batches: &mut Vec); + /// Adds an element to the batch. + /// + /// The default implementation uses `self.copy` with references to the owned arguments. + /// One should override it if the builder can take advantage of owned arguments. + fn push(&mut self, element: Self::Input) { + self.copy(&element); + } + /// Adds an element to the batch. + fn copy(&mut self, element: &Self::Input); /// Completes building and returns the batch. - // #[deprecated] fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Self::Output; } @@ -453,11 +453,9 @@ pub mod rc_blanket_impls { type Input = B::Input; type Time = B::Time; type Output = Rc; - fn from_batches(batches: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> Self::Output { - Rc::new(B::from_batches(batches, lower, upper, since)) - } fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { RcBuilder { builder: B::with_capacity(keys, vals, upds) } } - fn push_batches(&mut self, batches: &mut Vec) { self.builder.push_batches(batches) } + fn push(&mut self, element: Self::Input) { self.builder.push(element) } + fn copy(&mut self, element: &Self::Input) { self.builder.copy(element) } fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Rc { Rc::new(self.builder.done(lower, upper, since)) } } @@ -562,14 +560,9 @@ pub mod abomonated_blanket_impls { type Input = B::Input; type Time = B::Time; type Output = Abomonated>; - fn from_batches(batches: &mut Vec, lower: AntichainRef, upper: AntichainRef, since: AntichainRef) -> Self::Output { - let batch = B::from_batches(batches, lower, upper, since); - let mut bytes = Vec::with_capacity(measure(&batch)); - unsafe { abomonation::encode(&batch, &mut bytes).unwrap() }; - unsafe { Abomonated::::new(bytes).unwrap() } - } fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { AbomonatedBuilder { builder: B::with_capacity(keys, vals, upds) } } - fn push_batches(&mut self, batches: &mut Vec) { self.builder.push_batches(batches) } + fn push(&mut self, element: Self::Input) { self.builder.push(element) } + fn copy(&mut self, element: &Self::Input) { self.builder.copy(element) } fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Self::Output { let batch = self.builder.done(lower, upper, since); let mut bytes = Vec::with_capacity(measure(&batch));