Skip to content

Commit

Permalink
Pass data from batcher to builder by chain
Browse files Browse the repository at this point in the history
Currently, the data shared between the batcher and the builder are
individual tuples, either moved or by reference. This limits flexibility
around what kind of data can be provided to a builder, i.e., it has to
be in the form of tuples, either owned or a reference to a fully-formed
one. This works fine for vector-like structures, but will not work for
containers that like to arrange their data differently.

This change alters the contract between the batcher and the builder to
provide chunks instead of individual items (it does not require
_chains_.) The data in the chunks must be sorted, and subsequent calls
must maintain order, too. The input containers need to implement
`BuilderInput`, a type that describes how a container's items can be
broken into key, value, time, and diff, where key and value can be
references or owned data, as long as they can be pushed into the
underlying key and value containers.

The change has some quirks around comparing keys to keys already in the
builder. The types can differ, and the best solution I could come up
with was to add two explicit comparison functions to `BuilderInput` to
compare keys and values. While it is not elegant, it allows us to move
forward with this change, without adding nightmare-inducing trait bounds
all-over.

Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>
  • Loading branch information
antiguru committed May 23, 2024
1 parent 9171e73 commit f58f15e
Show file tree
Hide file tree
Showing 9 changed files with 305 additions and 191 deletions.
7 changes: 5 additions & 2 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ where

use ::timely::dataflow::scopes::Child;
use ::timely::progress::timestamp::Refines;
use timely::container::{PushContainer, PushInto};

impl<G, Tr> Arranged<G, Tr>
where
Expand Down Expand Up @@ -292,7 +293,8 @@ where
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Diff: Abelian,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((T1::KeyOwned, V), T2::Time, T2::Diff)>,
<T2::Builder as Builder>::Input: PushContainer,
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static,
{
self.reduce_core::<_,V,F,T2>(name, from, move |key, input, output, change| {
Expand All @@ -311,7 +313,8 @@ where
V: Data,
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((T1::KeyOwned,V), T2::Time, T2::Diff)>,
<T2::Builder as Builder>::Input: PushContainer,
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
{
use crate::operators::reduce::reduce_trace;
Expand Down
6 changes: 2 additions & 4 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ where
F: Fn(Tr::Val<'_>) -> V + 'static,
Tr::Time: TotalOrder+ExchangeData,
Tr::Batch: Batch,
Tr::Builder: Builder<Input = ((Tr::KeyOwned, V), Tr::Time, Tr::Diff)>,
Tr::Builder: Builder<Input = Vec<((Tr::KeyOwned, V), Tr::Time, Tr::Diff)>>,
{
let mut reader: Option<TraceAgent<Tr>> = None;

Expand Down Expand Up @@ -282,9 +282,7 @@ where
}
// Must insert updates in (key, val, time) order.
updates.sort();
for update in updates.drain(..) {
builder.push(update);
}
builder.push(&mut updates);
}
let batch = builder.done(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum()));
prev_frontier.clone_from(&upper);
Expand Down
29 changes: 17 additions & 12 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//! to the key and the list of values.
//! The function is expected to populate a list of output values.

use timely::container::{PushContainer, PushInto};
use crate::hashable::Hashable;
use crate::{Data, ExchangeData, Collection};
use crate::difference::{Semigroup, Abelian};
Expand Down Expand Up @@ -252,7 +253,7 @@ pub trait ReduceCore<G: Scope, K: ToOwned + ?Sized, V: Data, R: Semigroup> where
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Diff: Abelian,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((K::Owned, V), T2::Time, T2::Diff)>,
T2::Builder: Builder<Input = Vec<((K::Owned, V), T2::Time, T2::Diff)>>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static,
{
self.reduce_core::<_,_,T2>(name, from, move |key, input, output, change| {
Expand All @@ -274,7 +275,7 @@ pub trait ReduceCore<G: Scope, K: ToOwned + ?Sized, V: Data, R: Semigroup> where
T2: for<'a> Trace<Key<'a>=&'a K, Time=G::Timestamp>+'static,
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((K::Owned, V), T2::Time, T2::Diff)>,
T2::Builder: Builder<Input = Vec<((K::Owned, V), T2::Time, T2::Diff)>>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
;
}
Expand All @@ -293,7 +294,7 @@ where
F: Fn(T2::Val<'_>) -> V + 'static,
T2: for<'a> Trace<Key<'a>=&'a K, Time=G::Timestamp>+'static,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((K, V), T2::Time, T2::Diff)>,
T2::Builder: Builder<Input = Vec<((K, V), T2::Time, T2::Diff)>>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
{
self.arrange_by_key_named(&format!("Arrange: {}", name))
Expand All @@ -312,7 +313,8 @@ where
V: Data,
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Batch: Batch,
T2::Builder: Builder<Input = ((T1::KeyOwned, V), T2::Time, T2::Diff)>,
<T2::Builder as Builder>::Input: PushContainer,
((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<<T2::Builder as Builder>::Input>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
{
let mut result_trace = None;
Expand Down Expand Up @@ -448,10 +450,10 @@ where
// TODO: It would be better if all updates went into one batch, but timely dataflow prevents
// this as long as it requires that there is only one capability for each message.
let mut buffers = Vec::<(G::Timestamp, Vec<(V, G::Timestamp, T2::Diff)>)>::new();
let mut builders = Vec::new();
let mut chains = Vec::new();
for cap in capabilities.iter() {
buffers.push((cap.time().clone(), Vec::new()));
builders.push(T2::Builder::new());
chains.push(Default::default());
}

// cursors for navigating input and output traces.
Expand Down Expand Up @@ -531,7 +533,8 @@ where
for index in 0 .. buffers.len() {
buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0));
for (val, time, diff) in buffers[index].1.drain(..) {
builders[index].push(((key.into_owned(), val), time, diff));
// TODO(antiguru): This is dumb. Need to stage data and then reveal it.
((key.into_owned(), val), time, diff).push_into(&mut chains[index]);
}
}
}
Expand All @@ -543,8 +546,10 @@ where
output_lower.extend(lower_limit.borrow().iter().cloned());

// build and ship each batch (because only one capability per message).
for (index, builder) in builders.drain(..).enumerate() {

for (index, mut chain) in chains.drain(..).enumerate() {
let mut builder = T2::Builder::new();
// TODO(antiguru): Form actual chains.
builder.push(&mut chain);
// Form the upper limit of the next batch, which includes all times greater
// than the input batch, or the capabilities from i + 1 onward.
output_upper.clear();
Expand Down Expand Up @@ -648,7 +653,7 @@ where
where
F: Fn(C2::Val<'_>) -> V,
L: FnMut(
C1::Key<'a>,
C1::Key<'a>,
&[(C1::Val<'a>, C1::Diff)],
&mut Vec<(V, C2::Diff)>,
&mut Vec<(V, C2::Diff)>,
Expand Down Expand Up @@ -728,7 +733,7 @@ mod history_replay {
where
F: Fn(C2::Val<'_>) -> V,
L: FnMut(
C1::Key<'a>,
C1::Key<'a>,
&[(C1::Val<'a>, C1::Diff)],
&mut Vec<(V, C2::Diff)>,
&mut Vec<(V, C2::Diff)>,
Expand Down Expand Up @@ -1020,7 +1025,7 @@ mod history_replay {
new_interesting.push(next_time.clone());
debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&next_time)))
}


// Update `meet` to track the meet of each source of times.
meet = None;//T::maximum();
Expand Down
6 changes: 3 additions & 3 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ where
type Time = T;
type Input = Vec<((K, V), T, R)>;
type Chunk = Vec<((K, V), T, R)>;
type Output = ((K, V), T, R);
type Output = Vec<((K, V), T, R)>;

fn accept(&mut self, container: RefOrMut<Self::Input>, stash: &mut Vec<Self::Chunk>) -> Vec<Self::Chunk> {
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
Expand Down Expand Up @@ -497,8 +497,8 @@ where
}
let mut builder = B::with_capacity(keys, vals, upds);

for datum in chain.drain(..).flatten() {
builder.push(datum);
for mut chunk in chain.drain(..) {
builder.push(&mut chunk);
}

builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())
Expand Down
9 changes: 3 additions & 6 deletions src/trace/implementations/merge_batcher_col.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ where
type Time = T;
type Input = Vec<((K, V), T, R)>;
type Chunk = TimelyStack<((K, V), T, R)>;
type Output = ((K, V), T, R);
type Output = TimelyStack<((K, V), T, R)>;

fn accept(&mut self, container: RefOrMut<Self::Input>, stash: &mut Vec<Self::Chunk>) -> Vec<Self::Chunk> {
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
Expand Down Expand Up @@ -290,11 +290,8 @@ where
}
}
let mut builder = B::with_capacity(keys, vals, upds);

for chunk in chain.drain(..) {
for datum in chunk.iter() {
builder.copy(datum);
}
for mut chunk in chain.drain(..) {
builder.push(&mut chunk);
}

builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())
Expand Down
Loading

0 comments on commit f58f15e

Please sign in to comment.