Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalize flat container implementations to less specific regions #518

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 18 additions & 16 deletions src/consolidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ use std::collections::VecDeque;
use timely::Container;
use timely::container::{ContainerBuilder, PushInto, SizableContainer};
use timely::container::flatcontainer::{FlatStack, Push, Region};
use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion};
use crate::Data;
use crate::difference::{IsZero, Semigroup};
use crate::trace::cursor::IntoOwned;
use crate::trace::implementations::merge_batcher_flat::RegionUpdate;

/// Sorts and consolidates `vec`.
///
Expand Down Expand Up @@ -280,27 +280,29 @@ where
}
}

impl<K, V, T, R> ConsolidateLayout for FlatStack<TupleABCRegion<TupleABRegion<K, V>, T, R>>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is important.

impl<R> ConsolidateLayout for FlatStack<R>
where
for<'a> K: Region + Push<<K as Region>::ReadItem<'a>> + Clone + 'static,
for<'a> K::ReadItem<'a>: Ord + Copy,
for<'a> V: Region + Push<<V as Region>::ReadItem<'a>> + Clone + 'static,
for<'a> V::ReadItem<'a>: Ord + Copy,
for<'a> T: Region + Push<<T as Region>::ReadItem<'a>> + Clone + 'static,
for<'a> T::ReadItem<'a>: Ord + Copy,
R: Region + Push<<R as Region>::Owned> + Clone + 'static,
for<'a> R::Owned: Semigroup<R::ReadItem<'a>>,
R: RegionUpdate
+ Region
+ Clone
+ for<'a> Push<((R::Key<'a>, R::Val<'a>), R::Time<'a>, R::DiffOwned)>
+ 'static,
for<'a> R::DiffOwned: Semigroup<R::Diff<'a>>,
for<'a> R::ReadItem<'a>: Copy,
{
type Key<'a> = (K::ReadItem<'a>, V::ReadItem<'a>, T::ReadItem<'a>) where Self: 'a;
type Diff<'a> = R::ReadItem<'a> where Self: 'a;
type DiffOwned = R::Owned;
type Key<'a> = (R::Key<'a>, R::Val<'a>, R::Time<'a>) where Self: 'a;
type Diff<'a> = R::Diff<'a> where Self: 'a;
type DiffOwned = R::DiffOwned;

fn into_parts(((key, val), time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) {
fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) {
let (key, val, time, diff) = R::into_parts(item);
((key, val, time), diff)
}

fn cmp<'a>(((key1, val1), time1, _diff1): &Self::Item<'_>, ((key2, val2), time2, _diff2): &Self::Item<'_>) -> Ordering {
(K::reborrow(*key1), V::reborrow(*val1), T::reborrow(*time1)).cmp(&(K::reborrow(*key2), V::reborrow(*val2), T::reborrow(*time2)))
fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering {
let (key1, val1, time1, _diff1) = R::into_parts(*item1);
let (key2, val2, time2, _diff2) = R::into_parts(*item2);
(R::reborrow_key(key1), R::reborrow_val(val1), R::reborrow_time(time1)).cmp(&(R::reborrow_key(key2), R::reborrow_val(val2), R::reborrow_time(time2)))
}

fn push_with_diff(&mut self, (key, value, time): Self::Key<'_>, diff: Self::DiffOwned) {
Expand Down
154 changes: 105 additions & 49 deletions src/trace/implementations/merge_batcher_flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@ use crate::trace::cursor::IntoOwned;

/// A merger for flat stacks.
///
/// `MC` is a [`Region`] that implements [`MergerChunk`].
pub struct FlatcontainerMerger<MC> {
_marker: PhantomData<MC>,
/// `R` is a [`Region`] that implements [`RegionUpdate`].
pub struct FlatcontainerMerger<R> {
_marker: PhantomData<R>,
}

impl<MC> Default for FlatcontainerMerger<MC> {
impl<R> Default for FlatcontainerMerger<R> {
fn default() -> Self {
Self { _marker: PhantomData, }
}
}

impl<MC: Region> FlatcontainerMerger<MC> {
impl<R: Region> FlatcontainerMerger<R> {
const BUFFER_SIZE_BYTES: usize = 8 << 10;
fn chunk_capacity(&self) -> usize {
let size = ::std::mem::size_of::<MC::Index>();
let size = ::std::mem::size_of::<R::Index>();
if size == 0 {
Self::BUFFER_SIZE_BYTES
} else if size <= Self::BUFFER_SIZE_BYTES {
Expand All @@ -40,13 +40,13 @@ impl<MC: Region> FlatcontainerMerger<MC> {

/// Helper to get pre-sized vector from the stash.
#[inline]
fn empty(&self, stash: &mut Vec<FlatStack<MC>>) -> FlatStack<MC> {
fn empty(&self, stash: &mut Vec<FlatStack<R>>) -> FlatStack<R> {
stash.pop().unwrap_or_else(|| FlatStack::with_capacity(self.chunk_capacity()))
}

/// Helper to return a chunk to the stash.
#[inline]
fn recycle(&self, mut chunk: FlatStack<MC>, stash: &mut Vec<FlatStack<MC>>) {
fn recycle(&self, mut chunk: FlatStack<R>, stash: &mut Vec<FlatStack<R>>) {
// TODO: Should we limit the size of `stash`?
if chunk.capacity() == self.chunk_capacity() {
chunk.clear();
Expand All @@ -56,79 +56,135 @@ impl<MC: Region> FlatcontainerMerger<MC> {
}

/// Behavior to dissect items of chunks in the merge batcher
pub trait MergerChunk: Region {
pub trait RegionUpdate: Region {
/// The key of the update
type Key<'a>: Ord where Self: 'a;
type Key<'a>: Copy + Ord where Self: 'a;
/// The value of the update
type Val<'a>: Ord where Self: 'a;
type Val<'a>: Copy + Ord where Self: 'a;
/// The time of the update
type Time<'a>: Ord where Self: 'a;
type Time<'a>: Copy + Ord + IntoOwned<'a, Owned = Self::TimeOwned> where Self: 'a;
/// The owned time type.
type TimeOwned;
/// The diff of the update
type Diff<'a> where Self: 'a;
type Diff<'a>: Copy + IntoOwned<'a, Owned = Self::DiffOwned> where Self: 'a;
/// The owned diff type.
type DiffOwned;

/// Split a read item into its constituents. Must be cheap.
fn into_parts<'a>(item: Self::ReadItem<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>);

/// Converts a key into one with a narrower lifetime.
#[must_use]
fn reborrow_key<'b, 'a: 'b>(item: Self::Key<'a>) -> Self::Key<'b>
where
Self: 'a;

/// Converts a value into one with a narrower lifetime.
#[must_use]
fn reborrow_val<'b, 'a: 'b>(item: Self::Val<'a>) -> Self::Val<'b>
where
Self: 'a;

/// Converts a time into one with a narrower lifetime.
#[must_use]
fn reborrow_time<'b, 'a: 'b>(item: Self::Time<'a>) -> Self::Time<'b>
where
Self: 'a;

/// Converts a diff into one with a narrower lifetime.
#[must_use]
fn reborrow_diff<'b, 'a: 'b>(item: Self::Diff<'a>) -> Self::Diff<'b>
where
Self: 'a;
}

impl<K,V,T,R> MergerChunk for TupleABCRegion<TupleABRegion<K, V>, T, R>
impl<KR, VR, TR, RR> RegionUpdate for TupleABCRegion<TupleABRegion<KR, VR>, TR, RR>
where
K: Region,
for<'a> K::ReadItem<'a>: Ord,
V: Region,
for<'a> V::ReadItem<'a>: Ord,
T: Region,
for<'a> T::ReadItem<'a>: Ord,
R: Region,
KR: Region,
for<'a> KR::ReadItem<'a>: Copy + Ord,
VR: Region,
for<'a> VR::ReadItem<'a>: Copy + Ord,
TR: Region,
for<'a> TR::ReadItem<'a>: Copy + Ord,
RR: Region,
for<'a> RR::ReadItem<'a>: Copy + Ord,
{
type Key<'a> = K::ReadItem<'a> where Self: 'a;
type Val<'a> = V::ReadItem<'a> where Self: 'a;
type Time<'a> = T::ReadItem<'a> where Self: 'a;
type TimeOwned = T::Owned;
type Diff<'a> = R::ReadItem<'a> where Self: 'a;
type DiffOwned = R::Owned;
type Key<'a> = KR::ReadItem<'a> where Self: 'a;
type Val<'a> = VR::ReadItem<'a> where Self: 'a;
type Time<'a> = TR::ReadItem<'a> where Self: 'a;
type TimeOwned = TR::Owned;
type Diff<'a> = RR::ReadItem<'a> where Self: 'a;
type DiffOwned = RR::Owned;

fn into_parts<'a>(((key, val), time, diff): Self::ReadItem<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>) {
(key, val, time, diff)
}

fn reborrow_key<'b, 'a: 'b>(item: Self::Key<'a>) -> Self::Key<'b>
where
Self: 'a
{
KR::reborrow(item)
}

fn reborrow_val<'b, 'a: 'b>(item: Self::Val<'a>) -> Self::Val<'b>
where
Self: 'a
{
VR::reborrow(item)
}

fn reborrow_time<'b, 'a: 'b>(item: Self::Time<'a>) -> Self::Time<'b>
where
Self: 'a
{
TR::reborrow(item)
}

fn reborrow_diff<'b, 'a: 'b>(item: Self::Diff<'a>) -> Self::Diff<'b>
where
Self: 'a
{
RR::reborrow(item)
}
}

impl<MC> Merger for FlatcontainerMerger<MC>
impl<R> Merger for FlatcontainerMerger<R>
where
for<'a> MC: MergerChunk + Clone + 'static
+ ReserveItems<<MC as Region>::ReadItem<'a>>
+ Push<<MC as Region>::ReadItem<'a>>
+ Push<((MC::Key<'a>, MC::Val<'a>), MC::Time<'a>, &'a MC::DiffOwned)>
+ Push<((MC::Key<'a>, MC::Val<'a>), MC::Time<'a>, MC::Diff<'a>)>,
for<'a> MC::Time<'a>: PartialOrder<MC::TimeOwned> + Copy + IntoOwned<'a, Owned=MC::TimeOwned>,
for<'a> MC::Diff<'a>: IntoOwned<'a, Owned = MC::DiffOwned>,
for<'a> MC::TimeOwned: Ord + PartialOrder + PartialOrder<MC::Time<'a>> + Data,
for<'a> MC::DiffOwned: Default + Semigroup + Semigroup<MC::Diff<'a>> + Data,
for<'a> R: Region
+ RegionUpdate
+ Clone
+ ReserveItems<<R as Region>::ReadItem<'a>>
+ Push<<R as Region>::ReadItem<'a>>
+ Push<((R::Key<'a>, R::Val<'a>), R::Time<'a>, &'a R::DiffOwned)>
+ Push<((R::Key<'a>, R::Val<'a>), R::Time<'a>, R::Diff<'a>)>
+ 'static,
for<'a> R::Time<'a>: PartialOrder<R::TimeOwned> + Copy + IntoOwned<'a, Owned=R::TimeOwned>,
for<'a> R::Diff<'a>: IntoOwned<'a, Owned = R::DiffOwned>,
for<'a> R::TimeOwned: Ord + PartialOrder + PartialOrder<R::Time<'a>> + Data,
for<'a> R::DiffOwned: Default + Semigroup + Semigroup<R::Diff<'a>> + Data,
{
type Time = MC::TimeOwned;
type Chunk = FlatStack<MC>;
type Output = FlatStack<MC>;
type Time = R::TimeOwned;
type Chunk = FlatStack<R>;
type Output = FlatStack<R>;

fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>) {
let mut list1 = list1.into_iter();
let mut list2 = list2.into_iter();

let mut head1 = <FlatStackQueue<MC>>::from(list1.next().unwrap_or_default());
let mut head2 = <FlatStackQueue<MC>>::from(list2.next().unwrap_or_default());
let mut head1 = <FlatStackQueue<R>>::from(list1.next().unwrap_or_default());
let mut head2 = <FlatStackQueue<R>>::from(list2.next().unwrap_or_default());

let mut result = self.empty(stash);

let mut diff = MC::DiffOwned::default();
let mut diff = R::DiffOwned::default();

// while we have valid data in each input, merge.
while !head1.is_empty() && !head2.is_empty() {
while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() {
let cmp = {
let (key1, val1, time1, _diff) = MC::into_parts(head1.peek());
let (key2, val2, time2, _diff) = MC::into_parts(head2.peek());
let (key1, val1, time1, _diff) = R::into_parts(head1.peek());
let (key2, val2, time2, _diff) = R::into_parts(head2.peek());
((key1, val1), time1).cmp(&((key2, val2), time2))
};
// TODO: The following less/greater branches could plausibly be a good moment for
Expand All @@ -142,8 +198,8 @@ where
result.copy(head2.pop());
}
Ordering::Equal => {
let (key, val, time1, diff1) = MC::into_parts(head1.pop());
let (_key, _val, _time2, diff2) = MC::into_parts(head2.pop());
let (key, val, time1, diff1) = R::into_parts(head1.pop());
let (_key, _val, _time2, diff2) = R::into_parts(head2.pop());
diff1.clone_onto(&mut diff);
diff.plus_equals(&diff2);
if !diff.is_zero() {
Expand Down Expand Up @@ -214,7 +270,7 @@ where
let mut ready = self.empty(stash);

for buffer in merged {
for (key, val, time, diff) in buffer.iter().map(MC::into_parts) {
for (key, val, time, diff) in buffer.iter().map(R::into_parts) {
if upper.less_equal(&time) {
frontier.insert_with(&time, |time| (*time).into_owned());
if keep.len() == keep.capacity() && !keep.is_empty() {
Expand Down Expand Up @@ -254,7 +310,7 @@ where
{
let mut prev_keyval = None;
for buffer in chain.iter() {
for (key, val, time, _diff) in buffer.iter().map(MC::into_parts) {
for (key, val, time, _diff) in buffer.iter().map(R::into_parts) {
if !upper.less_equal(&time) {
if let Some((p_key, p_val)) = prev_keyval {
debug_assert!(p_key <= key);
Expand Down
32 changes: 13 additions & 19 deletions src/trace/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,12 +401,12 @@ where

mod flatcontainer {
use timely::container::flatcontainer::{FlatStack, IntoOwned, Push, Region};
use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion};
use timely::progress::Timestamp;

use crate::difference::Semigroup;
use crate::lattice::Lattice;
use crate::trace::implementations::{BatchContainer, BuilderInput, FlatLayout, Layout, OffsetList, Update};
use crate::trace::implementations::merge_batcher_flat::RegionUpdate;

impl<K, V, T, R> Update for FlatLayout<K, V, T, R>
where
Expand Down Expand Up @@ -448,36 +448,30 @@ mod flatcontainer {
type OffsetContainer = OffsetList;
}

impl<K,KBC,V,VBC,T,R> BuilderInput<KBC, VBC> for FlatStack<TupleABCRegion<TupleABRegion<K,V>,T,R>>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is important.

impl<KBC,VBC, R> BuilderInput<KBC, VBC> for FlatStack<R>
where
K: Region + Clone + 'static,
V: Region + Clone + 'static,
T: Region + Clone + 'static,
R: Region + Clone + 'static,
for<'a> K::ReadItem<'a>: Copy + Ord,
for<'a> V::ReadItem<'a>: Copy + Ord,
for<'a> T::ReadItem<'a>: Copy + Ord,
for<'a> R::ReadItem<'a>: Copy + Ord,
R: RegionUpdate + Region + Clone + 'static,
KBC: BatchContainer,
VBC: BatchContainer,
for<'a> KBC::ReadItem<'a>: PartialEq<K::ReadItem<'a>>,
for<'a> VBC::ReadItem<'a>: PartialEq<V::ReadItem<'a>>,
for<'a> KBC::ReadItem<'a>: PartialEq<R::Key<'a>>,
for<'a> VBC::ReadItem<'a>: PartialEq<R::Val<'a>>,
{
type Key<'a> = K::ReadItem<'a>;
type Val<'a> = V::ReadItem<'a>;
type Time = T::Owned;
type Diff = R::Owned;
type Key<'a> = R::Key<'a>;
type Val<'a> = R::Val<'a>;
type Time = R::TimeOwned;
type Diff = R::DiffOwned;

fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
fn into_parts<'a>(item: Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
let (key, val, time, diff) = R::into_parts(item);
(key, val, time.into_owned(), diff.into_owned())
}

fn key_eq(this: &Self::Key<'_>, other: KBC::ReadItem<'_>) -> bool {
KBC::reborrow(other) == K::reborrow(*this)
KBC::reborrow(other) == R::reborrow_key(*this)
}

fn val_eq(this: &Self::Val<'_>, other: VBC::ReadItem<'_>) -> bool {
VBC::reborrow(other) == V::reborrow(*this)
VBC::reborrow(other) == R::reborrow_val(*this)
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::trace::implementations::chunker::{ColumnationChunker, ContainerChunke
use crate::trace::implementations::spine_fueled::Spine;
use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger};
use crate::trace::implementations::merge_batcher_col::ColumnationMerger;
use crate::trace::implementations::merge_batcher_flat::{FlatcontainerMerger, MergerChunk};
use crate::trace::implementations::merge_batcher_flat::{FlatcontainerMerger, RegionUpdate};
use crate::trace::rc_blanket_impls::RcBuilder;

use super::{Update, Layout, Vector, TStack, Preferred, FlatLayout};
Expand All @@ -44,7 +44,7 @@ pub type ColValSpine<K, V, T, R> = Spine<
/// A trace implementation backed by flatcontainer storage.
pub type FlatValSpine<L, R, C> = Spine<
Rc<OrdValBatch<L>>,
MergeBatcher<C, ContainerChunker<FlatStack<R>>, FlatcontainerMerger<R>, <R as MergerChunk>::TimeOwned>,
MergeBatcher<C, ContainerChunker<FlatStack<R>>, FlatcontainerMerger<R>, <R as RegionUpdate>::TimeOwned>,
RcBuilder<OrdValBuilder<L, FlatStack<R>>>,
>;

Expand Down Expand Up @@ -74,7 +74,7 @@ pub type ColKeySpine<K, T, R> = Spine<
/// A trace implementation backed by flatcontainer storage.
pub type FlatKeySpine<L, R, C> = Spine<
Rc<OrdKeyBatch<L>>,
MergeBatcher<C, ContainerChunker<FlatStack<R>>, FlatcontainerMerger<R>, <R as MergerChunk>::TimeOwned>,
MergeBatcher<C, ContainerChunker<FlatStack<R>>, FlatcontainerMerger<R>, <R as RegionUpdate>::TimeOwned>,
RcBuilder<OrdKeyBuilder<L, FlatStack<R>>>,
>;

Expand Down