Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>
  • Loading branch information
antiguru committed Jun 7, 2024
1 parent 5d53f4e commit 58a8e53
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 30 deletions.
23 changes: 19 additions & 4 deletions src/consolidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,12 @@ where
}
}

/// Layout of data to be consolidated.
// TODO: This could be split in two, to separate sorting and consolidation.
/// Layout of containers and their read items to be consolidated.
///
/// This trait specifies behavior to extract keys and diffs from container's read
/// items. Consolidation accumulates the diffs per key.
///
/// The trait requires `Container` to have access to its `Item` GAT.
pub trait ConsolidateLayout: Container {
/// Key portion of data, essentially everything minus the diff
type Key<'a>: Eq where Self: 'a;
Expand All @@ -227,14 +231,22 @@ pub trait ConsolidateLayout: Container {
/// Owned diff type.
type DiffOwned: for<'a> Semigroup<Self::Diff<'a>>;

/// Deconstruct an item into key and diff.
/// Deconstruct an item into key and diff. Must be cheap.
fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>);

/// Push an element to a compatible container.
///
/// This function is odd to have, so let's explain why it exists. Ideally, the container
/// would accept a `(key, diff)` pair and we wouldn't need this function. However, we
/// might never be in a position where this is true: Vectors can push any `T`, which would
/// collide with a specific implementation for pushing tuples of mixes GATs and owned types.
///
/// For this reason, we expose a function here that takes a GAT key and an owned diff, and
/// leave it to the implementation to "patch" a suitable item that can be pushed into `self`.
fn push_with_diff(&mut self, key: Self::Key<'_>, diff: Self::DiffOwned);

/// Compare two items by key to sort containers.
fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering;
fn cmp(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering;
}

impl<D, T, R> ConsolidateLayout for Vec<(D, T, R)>
Expand Down Expand Up @@ -311,10 +323,13 @@ where

// Consolidate sorted data.
let mut previous: Option<(C::Key<'_>, C::DiffOwned)> = None;
// TODO: We should ensure that `target` has sufficient capacity, but `Container` doesn't
// offer a suitable API.
for item in permutation.drain(..) {
let (key, diff) = C::into_parts(item);
match &mut previous {
// Initial iteration, remeber key and diff.
// TODO: Opportunity for GatCow for diff.
None => previous = Some((key, diff.into_owned())),
Some((prevkey, d)) => {
// Second and following iteration, compare and accumulate or emit.
Expand Down
34 changes: 23 additions & 11 deletions src/trace/implementations/chunker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ where
impl<Input, Output, Consolidator> Chunker for ContainerChunker<Input, Output, Consolidator>
where
Input: Container,
for<'a> Output: SizableContainer + PushInto<Input::ItemRef<'a>>,
for<'a> Output: SizableContainer + PushInto<Input::Item<'a>> + PushInto<Input::ItemRef<'a>>,
Consolidator: ConsolidateContainer<Output>,
{
type Input = Input;
Expand All @@ -307,18 +307,30 @@ where
if self.pending.capacity() < Output::preferred_capacity() {
self.pending.reserve(Output::preferred_capacity() - self.pending.len());
}
// TODO: This uses `IterRef`, which isn't optimal for containers that can move.
for item in container.iter() {
self.pending.push(item);
if self.pending.len() == self.pending.capacity() {
self.consolidator.consolidate(&mut self.pending, &mut self.empty);
std::mem::swap(&mut self.pending, &mut self.empty);
self.empty.clear();
if self.pending.len() > self.pending.capacity() / 2 {
let form_batch = |this: &mut Self| {
if this.pending.len() == this.pending.capacity() {
this.consolidator.consolidate(&mut this.pending, &mut this.empty);
std::mem::swap(&mut this.pending, &mut this.empty);
this.empty.clear();
if this.pending.len() > this.pending.capacity() / 2 {
// Note that we're pushing non-full containers, which is a deviation from
// other implementation. The reason for this is that we cannot extract
// partial data from `self.pending`. We should revisit this in the future.
self.ready.push(std::mem::take(&mut self.pending));
// partial data from `this.pending`. We should revisit this in the future.
this.ready.push(std::mem::take(&mut this.pending));
}
}
};
match container {
RefOrMut::Ref(container) => {
for item in container.iter() {
self.pending.push(item);
form_batch(self);
}
}
RefOrMut::Mut(container) => {
for item in container.drain() {
self.pending.push(item);
form_batch(self);
}
}
}
Expand Down
39 changes: 24 additions & 15 deletions src/trace/implementations/merge_batcher_flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ where
let (key2, val2, time2, _diff) = FR::into_parts(head2.peek());
((key1, val1), time1).cmp(&((key2, val2), time2))
};
// TODO: The following less/greater branches could plausibly be a good moment for
// `copy_range`, on account of runs of records that might benefit more from a
// `memcpy`.
match cmp {
Ordering::Less => {
result.copy(head1.pop());
Expand Down Expand Up @@ -157,31 +160,33 @@ where
}
}

if result.len() > 0 {
output.push(result);
} else {
self.recycle(result, stash);
}

if !head1.is_empty() {
let mut result = self.empty(stash);
result.reserve_items(head1.iter());
for item in head1.iter() {
while !head1.is_empty() {
let advance = result.capacity() - result.len();
let iter = head1.iter().take(advance);
result.reserve_items(iter.clone());
for item in iter {
result.copy(item);
}
output.push(result);
head1.advance(advance);
result = self.empty(stash);
}
output.extend(list1);
self.recycle(head1.done(), stash);

if !head2.is_empty() {
let mut result = self.empty(stash);
result.reserve_items(head2.iter());
for item in head2.iter() {
while !head2.is_empty() {
let advance = result.capacity() - result.len();
let iter = head2.iter().take(advance);
result.reserve_items(iter.clone());
for item in iter {
result.copy(item);
}
output.push(result);
head2.advance(advance);
result = self.empty(stash);
}
output.extend(list2);
self.recycle(head2.done(), stash);
}

fn extract(
Expand Down Expand Up @@ -306,11 +311,15 @@ impl<R: Region> FlatStackQueue<R> {
}

fn is_empty(&self) -> bool {
self.head == self.list.len()
self.head >= self.list.len()
}

/// Return an iterator over the remaining elements.
fn iter(&self) -> impl Iterator<Item = R::ReadItem<'_>> + Clone {
self.list.iter().skip(self.head)
}

fn advance(&mut self, consumed: usize) {
self.head += consumed;
}
}

0 comments on commit 58a8e53

Please sign in to comment.