Skip to content

Commit

Permalink
Encode singleton repetition update cleverly (#421)
Browse files Browse the repository at this point in the history
* Encode singleton repetition update cleverly

* Correct off-by-one error in singleton counting

* Remove redundant code
  • Loading branch information
frankmcsherry authored Nov 20, 2023
1 parent 42e032d commit 6027145
Showing 1 changed file with 85 additions and 15 deletions.
100 changes: 85 additions & 15 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,15 @@ mod val_batch {
}
/// Lower and upper bounds in `self.updates` corresponding to the value at `index`.
fn updates_for_value(&self, index: usize) -> (usize, usize) {
(self.vals_offs[index].try_into().ok().unwrap(), self.vals_offs[index+1].try_into().ok().unwrap())
let mut lower = self.vals_offs[index].try_into().ok().unwrap();
let upper = self.vals_offs[index+1].try_into().ok().unwrap();
// We use equal lower and upper to encode "singleton update; just before here".
// It should only apply when there is a prior element, so `lower` should be greater than zero.
if lower == upper {
assert!(lower > 0);
lower -= 1;
}
(lower, upper)
}
}

Expand All @@ -80,6 +88,12 @@ mod val_batch {
pub storage: OrdValStorage<L>,
/// Description of the update times this layer represents.
pub description: Description<<L::Target as Update>::Time>,
/// The number of updates reflected in the batch.
///
/// We track this separately from `storage` because due to the singleton optimization,
/// we may have many more updates than `storage.updates.len()`. It should equal that
/// length, plus the number of singleton optimizations employed.
pub updates: usize,
}

impl<L: Layout> BatchReader for OrdValBatch<L> {
Expand All @@ -99,7 +113,7 @@ mod val_batch {
fn len(&self) -> usize {
// Normally this would be `self.updates.len()`, but we have a clever compact encoding.
// Perhaps we should count such exceptions to the side, to provide a correct accounting.
self.storage.updates.len()
self.updates
}
fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
}
Expand Down Expand Up @@ -130,6 +144,8 @@ mod val_batch {
/// We could emulate a `ChangeBatch` here, with related compaction smarts.
/// A `ChangeBatch` itself needs an `i64` diff type, which we have not.
update_stash: Vec<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
/// Counts the number of singleton-optimized entries, that we may correctly count the updates.
singletons: usize,
}

impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L> {
Expand Down Expand Up @@ -162,10 +178,12 @@ mod val_batch {
result: storage,
description,
update_stash: Vec::new(),
singletons: 0,
}
}
fn done(self) -> OrdValBatch<L> {
OrdValBatch {
updates: self.result.updates.len() + self.singletons,
storage: self.result,
description: self.description,
}
Expand Down Expand Up @@ -348,8 +366,18 @@ mod val_batch {
use consolidation;
consolidation::consolidate(&mut self.update_stash);
if !self.update_stash.is_empty() {
for item in self.update_stash.drain(..) {
self.result.updates.push(item);
// If there is a single element, equal to a just-prior recorded update,
// we push nothing and report an unincremented offset to encode this case.
if self.update_stash.len() == 1 && self.update_stash.last() == self.result.updates.last() {
// Just clear out update_stash, as we won't drain it here.
self.update_stash.clear();
self.singletons += 1;
}
else {
// Conventional; move `update_stash` into `updates`.
for item in self.update_stash.drain(..) {
self.result.updates.push(item);
}
}
Some(self.result.updates.len().try_into().ok().unwrap())
} else {
Expand Down Expand Up @@ -425,6 +453,40 @@ mod val_batch {
/// A builder for creating layers from unsorted update tuples.
pub struct OrdValBuilder<L: Layout> {
result: OrdValStorage<L>,
singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
/// Counts the number of singleton optimizations we performed.
///
/// This number allows us to correctly gauge the total number of updates reflected in a batch,
/// even though `updates.len()` may be much shorter than this amount.
singletons: usize,
}

impl<L: Layout> OrdValBuilder<L> {
/// Pushes a single update, which may set `self.singleton` rather than push.
///
/// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`.
/// However, for "clever" reasons it does not do this. Instead, it looks for opportunities
/// to encode a singleton update with an "absert" update: repeating the most recent offset.
/// This otherwise invalid state encodes "look back one element".
///
/// When `self.singleton` is `Some`, it means that we have seen one update and it matched the
/// previously pushed update exactly. In that case, we do not push the update into `updates`.
/// The update tuple is retained in `self.singleton` in case we see another update and need
/// to recover the singleton to push it into `updates` to join the second update.
fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
// If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it.
if self.result.updates.last().map(|(t, d)| t == &time && d == &diff) == Some(true) {
assert!(self.singleton.is_none());
self.singleton = Some((time, diff));
}
else {
// If we have pushed a single element, we need to copy it out to meet this one.
if let Some(time_diff) = self.singleton.take() {
self.result.updates.push(time_diff);
}
self.result.updates.push((time, diff));
}
}
}

impl<L: Layout> Builder<OrdValBatch<L>> for OrdValBuilder<L> {
Expand All @@ -439,7 +501,9 @@ mod val_batch {
vals: L::ValContainer::with_capacity(cap),
vals_offs: Vec::with_capacity(cap),
updates: L::UpdContainer::with_capacity(cap),
}
},
singleton: None,
singletons: 0,
}
}

Expand All @@ -450,20 +514,20 @@ mod val_batch {
if self.result.keys.last() == Some(&key) {
// Perhaps this is a continuation of an already received value.
if self.result.vals.last() == Some(&val) {
// TODO: here we could look for repetition, and not push the update in that case.
// More logic (and state) would be required to correctly wrangle this.
self.result.updates.push((time, diff));
self.push_update(time, diff);
} else {
// New value; complete representation of prior value.
self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap());
self.result.updates.push((time, diff));
if self.singleton.take().is_some() { self.singletons += 1; }
self.push_update(time, diff);
self.result.vals.push(val);
}
} else {
// New key; complete representation of prior key.
self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap());
if self.singleton.take().is_some() { self.singletons += 1; }
self.result.keys_offs.push(self.result.vals.len().try_into().ok().unwrap());
self.result.updates.push((time, diff));
self.push_update(time, diff);
self.result.vals.push(val);
self.result.keys.push(key);
}
Expand All @@ -478,18 +542,22 @@ mod val_batch {
if self.result.vals.last() == Some(val) {
// TODO: here we could look for repetition, and not push the update in that case.
// More logic (and state) would be required to correctly wrangle this.
self.result.updates.push((time.clone(), diff.clone()));
self.push_update(time.clone(), diff.clone());
} else {
// New value; complete representation of prior value.
self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap());
self.result.updates.push((time.clone(), diff.clone()));
// Remove any pending singleton, and if it was set increment our count.
if self.singleton.take().is_some() { self.singletons += 1; }
self.push_update(time.clone(), diff.clone());
self.result.vals.copy(val);
}
} else {
// New key; complete representation of prior key.
self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap());
// Remove any pending singleton, and if it was set increment our count.
if self.singleton.take().is_some() { self.singletons += 1; }
self.result.keys_offs.push(self.result.vals.len().try_into().ok().unwrap());
self.result.updates.push((time.clone(), diff.clone()));
self.push_update(time.clone(), diff.clone());
self.result.vals.copy(val);
self.result.keys.copy(key);
}
Expand All @@ -498,10 +566,12 @@ mod val_batch {
#[inline(never)]
fn done(mut self, lower: Antichain<<L::Target as Update>::Time>, upper: Antichain<<L::Target as Update>::Time>, since: Antichain<<L::Target as Update>::Time>) -> OrdValBatch<L> {
// Record the final offsets
self.result.keys_offs.push(self.result.vals.len().try_into().ok().unwrap());
self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap());

// Remove any pending singleton, and if it was set increment our count.
if self.singleton.take().is_some() { self.singletons += 1; }
self.result.keys_offs.push(self.result.vals.len().try_into().ok().unwrap());
OrdValBatch {
updates: self.result.updates.len() + self.singletons,
storage: self.result,
description: Description::new(lower, upper, since),
}
Expand Down

0 comments on commit 6027145

Please sign in to comment.