From 602714522358fc54e09e8180e27ffc7b5c1e2919 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 20 Nov 2023 15:49:11 -0500 Subject: [PATCH] Encode singleton repetition update cleverly (#421) * Encode singleton repetition update cleverly * Correct off-by-one error in singleton counting * Remove redundant code --- src/trace/implementations/ord_neu.rs | 100 +++++++++++++++++++++++---- 1 file changed, 85 insertions(+), 15 deletions(-) diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index 96f27a753..03891f8c2 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -69,7 +69,15 @@ mod val_batch { } /// Lower and upper bounds in `self.updates` corresponding to the value at `index`. fn updates_for_value(&self, index: usize) -> (usize, usize) { - (self.vals_offs[index].try_into().ok().unwrap(), self.vals_offs[index+1].try_into().ok().unwrap()) + let mut lower = self.vals_offs[index].try_into().ok().unwrap(); + let upper = self.vals_offs[index+1].try_into().ok().unwrap(); + // We use equal lower and upper to encode "singleton update; just before here". + // It should only apply when there is a prior element, so `lower` should be greater than zero. + if lower == upper { + assert!(lower > 0); + lower -= 1; + } + (lower, upper) } } @@ -80,6 +88,12 @@ mod val_batch { pub storage: OrdValStorage, /// Description of the update times this layer represents. pub description: Description<::Time>, + /// The number of updates reflected in the batch. + /// + /// We track this separately from `storage` because due to the singleton optimization, + /// we may have many more updates than `storage.updates.len()`. It should equal that + /// length, plus the number of singleton optimizations employed. + pub updates: usize, } impl BatchReader for OrdValBatch { @@ -99,7 +113,7 @@ mod val_batch { fn len(&self) -> usize { // Normally this would be `self.updates.len()`, but we have a clever compact encoding. // Perhaps we should count such exceptions to the side, to provide a correct accounting. - self.storage.updates.len() + self.updates } fn description(&self) -> &Description<::Time> { &self.description } } @@ -130,6 +144,8 @@ mod val_batch { /// We could emulate a `ChangeBatch` here, with related compaction smarts. /// A `ChangeBatch` itself needs an `i64` diff type, which we have not. update_stash: Vec<(::Time, ::Diff)>, + /// Counts the number of singleton-optimized entries, that we may correctly count the updates. + singletons: usize, } impl Merger> for OrdValMerger { @@ -162,10 +178,12 @@ mod val_batch { result: storage, description, update_stash: Vec::new(), + singletons: 0, } } fn done(self) -> OrdValBatch { OrdValBatch { + updates: self.result.updates.len() + self.singletons, storage: self.result, description: self.description, } @@ -348,8 +366,18 @@ mod val_batch { use consolidation; consolidation::consolidate(&mut self.update_stash); if !self.update_stash.is_empty() { - for item in self.update_stash.drain(..) { - self.result.updates.push(item); + // If there is a single element, equal to a just-prior recorded update, + // we push nothing and report an unincremented offset to encode this case. + if self.update_stash.len() == 1 && self.update_stash.last() == self.result.updates.last() { + // Just clear out update_stash, as we won't drain it here. + self.update_stash.clear(); + self.singletons += 1; + } + else { + // Conventional; move `update_stash` into `updates`. + for item in self.update_stash.drain(..) { + self.result.updates.push(item); + } } Some(self.result.updates.len().try_into().ok().unwrap()) } else { @@ -425,6 +453,40 @@ mod val_batch { /// A builder for creating layers from unsorted update tuples. pub struct OrdValBuilder { result: OrdValStorage, + singleton: Option<(::Time, ::Diff)>, + /// Counts the number of singleton optimizations we performed. + /// + /// This number allows us to correctly gauge the total number of updates reflected in a batch, + /// even though `updates.len()` may be much shorter than this amount. + singletons: usize, + } + + impl OrdValBuilder { + /// Pushes a single update, which may set `self.singleton` rather than push. + /// + /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`. + /// However, for "clever" reasons it does not do this. Instead, it looks for opportunities + /// to encode a singleton update with an "absert" update: repeating the most recent offset. + /// This otherwise invalid state encodes "look back one element". + /// + /// When `self.singleton` is `Some`, it means that we have seen one update and it matched the + /// previously pushed update exactly. In that case, we do not push the update into `updates`. + /// The update tuple is retained in `self.singleton` in case we see another update and need + /// to recover the singleton to push it into `updates` to join the second update. + fn push_update(&mut self, time: ::Time, diff: ::Diff) { + // If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it. + if self.result.updates.last().map(|(t, d)| t == &time && d == &diff) == Some(true) { + assert!(self.singleton.is_none()); + self.singleton = Some((time, diff)); + } + else { + // If we have pushed a single element, we need to copy it out to meet this one. + if let Some(time_diff) = self.singleton.take() { + self.result.updates.push(time_diff); + } + self.result.updates.push((time, diff)); + } + } } impl Builder> for OrdValBuilder { @@ -439,7 +501,9 @@ mod val_batch { vals: L::ValContainer::with_capacity(cap), vals_offs: Vec::with_capacity(cap), updates: L::UpdContainer::with_capacity(cap), - } + }, + singleton: None, + singletons: 0, } } @@ -450,20 +514,20 @@ mod val_batch { if self.result.keys.last() == Some(&key) { // Perhaps this is a continuation of an already received value. if self.result.vals.last() == Some(&val) { - // TODO: here we could look for repetition, and not push the update in that case. - // More logic (and state) would be required to correctly wrangle this. - self.result.updates.push((time, diff)); + self.push_update(time, diff); } else { // New value; complete representation of prior value. self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap()); - self.result.updates.push((time, diff)); + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time, diff); self.result.vals.push(val); } } else { // New key; complete representation of prior key. self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap()); + if self.singleton.take().is_some() { self.singletons += 1; } self.result.keys_offs.push(self.result.vals.len().try_into().ok().unwrap()); - self.result.updates.push((time, diff)); + self.push_update(time, diff); self.result.vals.push(val); self.result.keys.push(key); } @@ -478,18 +542,22 @@ mod val_batch { if self.result.vals.last() == Some(val) { // TODO: here we could look for repetition, and not push the update in that case. // More logic (and state) would be required to correctly wrangle this. - self.result.updates.push((time.clone(), diff.clone())); + self.push_update(time.clone(), diff.clone()); } else { // New value; complete representation of prior value. self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap()); - self.result.updates.push((time.clone(), diff.clone())); + // Remove any pending singleton, and if it was set increment our count. + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time.clone(), diff.clone()); self.result.vals.copy(val); } } else { // New key; complete representation of prior key. self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap()); + // Remove any pending singleton, and if it was set increment our count. + if self.singleton.take().is_some() { self.singletons += 1; } self.result.keys_offs.push(self.result.vals.len().try_into().ok().unwrap()); - self.result.updates.push((time.clone(), diff.clone())); + self.push_update(time.clone(), diff.clone()); self.result.vals.copy(val); self.result.keys.copy(key); } @@ -498,10 +566,12 @@ mod val_batch { #[inline(never)] fn done(mut self, lower: Antichain<::Time>, upper: Antichain<::Time>, since: Antichain<::Time>) -> OrdValBatch { // Record the final offsets - self.result.keys_offs.push(self.result.vals.len().try_into().ok().unwrap()); self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap()); - + // Remove any pending singleton, and if it was set increment our count. + if self.singleton.take().is_some() { self.singletons += 1; } + self.result.keys_offs.push(self.result.vals.len().try_into().ok().unwrap()); OrdValBatch { + updates: self.result.updates.len() + self.singletons, storage: self.result, description: Description::new(lower, upper, since), }