Skip to content

Commit

Permalink
Pass data from batcher to builder by chunk (#491)
Browse files Browse the repository at this point in the history
* Pass data from batcher to builder by chain

Currently, the data shared between the batcher and the builder are
individual tuples, either moved or by reference. This limits flexibility
around what kind of data can be provided to a builder, i.e., it has to
be in the form of tuples, either owned or a reference to a fully-formed
one. This works fine for vector-like structures, but will not work for
containers that like to arrange their data differently.

This change alters the contract between the batcher and the builder to
provide chunks instead of individual items (it does not require
_chains_.) The data in the chunks must be sorted, and subsequent calls
must maintain order, too. The input containers need to implement
`BuilderInput`, a type that describes how a container's items can be
broken into key, value, time, and diff, where key and value can be
references or owned data, as long as they can be pushed into the
underlying key and value containers.

The change has some quirks around comparing keys to keys already in the
builder. The types can differ, and the best solution I could come up
with was to add two explicit comparison functions to `BuilderInput` to
compare keys and values. While it is not elegant, it allows us to move
forward with this change, without adding nightmare-inducing trait bounds
all-over.

Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>

* Address feedback

Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>

---------

Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>
  • Loading branch information
antiguru authored May 23, 2024
1 parent 9731d7f commit 85b126c
Show file tree
Hide file tree
Showing 9 changed files with 298 additions and 190 deletions.
8 changes: 6 additions & 2 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ where

use ::timely::dataflow::scopes::Child;
use ::timely::progress::timestamp::Refines;
use timely::Container;
use timely::container::PushInto;

impl<G, Tr> Arranged<G, Tr>
where
Expand Down Expand Up @@ -292,7 +294,8 @@ where
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Diff: Abelian,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((T1::KeyOwned, V), T2::Time, T2::Diff)>,
<T2::Builder as Builder>::Input: Container,
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static,
{
self.reduce_core::<_,V,F,T2>(name, from, move |key, input, output, change| {
Expand All @@ -311,7 +314,8 @@ where
V: Data,
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((T1::KeyOwned,V), T2::Time, T2::Diff)>,
<T2::Builder as Builder>::Input: Container,
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
{
use crate::operators::reduce::reduce_trace;
Expand Down
6 changes: 2 additions & 4 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ where
F: Fn(Tr::Val<'_>) -> V + 'static,
Tr::Time: TotalOrder+ExchangeData,
Tr::Batch: Batch,
Tr::Builder: Builder<Input = ((Tr::KeyOwned, V), Tr::Time, Tr::Diff)>,
Tr::Builder: Builder<Input = Vec<((Tr::KeyOwned, V), Tr::Time, Tr::Diff)>>,
{
let mut reader: Option<TraceAgent<Tr>> = None;

Expand Down Expand Up @@ -282,9 +282,7 @@ where
}
// Must insert updates in (key, val, time) order.
updates.sort();
for update in updates.drain(..) {
builder.push(update);
}
builder.push(&mut updates);
}
let batch = builder.done(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum()));
prev_frontier.clone_from(&upper);
Expand Down
23 changes: 15 additions & 8 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
//! to the key and the list of values.
//! The function is expected to populate a list of output values.

use timely::Container;
use timely::container::PushInto;
use crate::hashable::Hashable;
use crate::{Data, ExchangeData, Collection};
use crate::difference::{Semigroup, Abelian};
Expand Down Expand Up @@ -252,7 +254,7 @@ pub trait ReduceCore<G: Scope, K: ToOwned + ?Sized, V: Data, R: Semigroup> where
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Diff: Abelian,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((K::Owned, V), T2::Time, T2::Diff)>,
T2::Builder: Builder<Input = Vec<((K::Owned, V), T2::Time, T2::Diff)>>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static,
{
self.reduce_core::<_,_,T2>(name, from, move |key, input, output, change| {
Expand All @@ -274,7 +276,7 @@ pub trait ReduceCore<G: Scope, K: ToOwned + ?Sized, V: Data, R: Semigroup> where
T2: for<'a> Trace<Key<'a>=&'a K, Time=G::Timestamp>+'static,
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((K::Owned, V), T2::Time, T2::Diff)>,
T2::Builder: Builder<Input = Vec<((K::Owned, V), T2::Time, T2::Diff)>>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
;
}
Expand All @@ -293,7 +295,7 @@ where
F: Fn(T2::Val<'_>) -> V + 'static,
T2: for<'a> Trace<Key<'a>=&'a K, Time=G::Timestamp>+'static,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((K, V), T2::Time, T2::Diff)>,
T2::Builder: Builder<Input = Vec<((K, V), T2::Time, T2::Diff)>>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
{
self.arrange_by_key_named(&format!("Arrange: {}", name))
Expand All @@ -312,7 +314,8 @@ where
V: Data,
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((T1::KeyOwned, V), T2::Time, T2::Diff)>,
<T2::Builder as Builder>::Input: Container,
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
{
let mut result_trace = None;
Expand Down Expand Up @@ -454,6 +457,8 @@ where
builders.push(T2::Builder::new());
}

let mut buffer = Default::default();

// cursors for navigating input and output traces.
let (mut source_cursor, source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor");
let source_storage = &source_storage;
Expand Down Expand Up @@ -531,7 +536,9 @@ where
for index in 0 .. buffers.len() {
buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0));
for (val, time, diff) in buffers[index].1.drain(..) {
builders[index].push(((key.into_owned(), val), time, diff));
((key.into_owned(), val), time, diff).push_into(&mut buffer);
builders[index].push(&mut buffer);
buffer.clear();
}
}
}
Expand Down Expand Up @@ -648,7 +655,7 @@ where
where
F: Fn(C2::Val<'_>) -> V,
L: FnMut(
C1::Key<'a>,
C1::Key<'a>,
&[(C1::Val<'a>, C1::Diff)],
&mut Vec<(V, C2::Diff)>,
&mut Vec<(V, C2::Diff)>,
Expand Down Expand Up @@ -728,7 +735,7 @@ mod history_replay {
where
F: Fn(C2::Val<'_>) -> V,
L: FnMut(
C1::Key<'a>,
C1::Key<'a>,
&[(C1::Val<'a>, C1::Diff)],
&mut Vec<(V, C2::Diff)>,
&mut Vec<(V, C2::Diff)>,
Expand Down Expand Up @@ -1020,7 +1027,7 @@ mod history_replay {
new_interesting.push(next_time.clone());
debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&next_time)))
}


// Update `meet` to track the meet of each source of times.
meet = None;//T::maximum();
Expand Down
6 changes: 3 additions & 3 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ where
type Time = T;
type Input = Vec<((K, V), T, R)>;
type Chunk = Vec<((K, V), T, R)>;
type Output = ((K, V), T, R);
type Output = Vec<((K, V), T, R)>;

fn accept(&mut self, container: RefOrMut<Self::Input>, stash: &mut Vec<Self::Chunk>) -> Vec<Self::Chunk> {
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
Expand Down Expand Up @@ -497,8 +497,8 @@ where
}
let mut builder = B::with_capacity(keys, vals, upds);

for datum in chain.drain(..).flatten() {
builder.push(datum);
for mut chunk in chain.drain(..) {
builder.push(&mut chunk);
}

builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())
Expand Down
9 changes: 3 additions & 6 deletions src/trace/implementations/merge_batcher_col.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ where
type Time = T;
type Input = Vec<((K, V), T, R)>;
type Chunk = TimelyStack<((K, V), T, R)>;
type Output = ((K, V), T, R);
type Output = TimelyStack<((K, V), T, R)>;

fn accept(&mut self, container: RefOrMut<Self::Input>, stash: &mut Vec<Self::Chunk>) -> Vec<Self::Chunk> {
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
Expand Down Expand Up @@ -290,11 +290,8 @@ where
}
}
let mut builder = B::with_capacity(keys, vals, upds);

for chunk in chain.drain(..) {
for datum in chunk.iter() {
builder.copy(datum);
}
for mut chunk in chain.drain(..) {
builder.push(&mut chunk);
}

builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())
Expand Down
Loading

0 comments on commit 85b126c

Please sign in to comment.