Skip to content

Commit

Permalink
Undo some changes, rip out old columnated batcher
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>
  • Loading branch information
antiguru committed Apr 24, 2024
1 parent 3b4073b commit b07aa9e
Show file tree
Hide file tree
Showing 8 changed files with 237 additions and 591 deletions.
4 changes: 2 additions & 2 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ where
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Diff: Abelian,
T2::Batch: Batch,
T2::Builder: Builder<Input = Vec<((T1::KeyOwned, V), T2::Time, T2::Diff)>>,
T2::Builder: Builder<Input = ((T1::KeyOwned, V), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static,
{
self.reduce_core::<_,V,F,T2>(name, from, move |key, input, output, change| {
Expand All @@ -303,7 +303,7 @@ where
V: Data,
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Batch: Batch,
T2::Builder: Builder<Input = Vec<((T1::KeyOwned,V), T2::Time, T2::Diff)>>,
T2::Builder: Builder<Input = ((T1::KeyOwned,V), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
{
use crate::operators::reduce::reduce_trace;
Expand Down
10 changes: 6 additions & 4 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ where
F: Fn(Tr::Val<'_>) -> V + 'static,
Tr::Time: TotalOrder+ExchangeData,
Tr::Batch: Batch,
Tr::Builder: Builder<Input = Vec<((Tr::KeyOwned, V), Tr::Time, Tr::Diff)>>,
Tr::Builder: Builder<Input = ((Tr::KeyOwned, V), Tr::Time, Tr::Diff)>,
{
let mut reader: Option<TraceAgent<Tr>> = None;

Expand Down Expand Up @@ -241,6 +241,7 @@ where
// Prepare a cursor to the existing arrangement, and a batch builder for
// new stuff that we add.
let (mut trace_cursor, trace_storage) = reader_local.cursor();
let mut builder = Tr::Builder::new();
for (key, mut list) in to_process.drain(..) {

use trace::cursor::MyTrait;
Expand Down Expand Up @@ -281,10 +282,11 @@ where
}
// Must insert updates in (key, val, time) order.
updates.sort();
for update in updates.drain(..) {
builder.push(update);
}
}
let mut batches = vec![std::mem::take(&mut updates)];
let batch = Tr::Builder::from_batches(&mut batches, prev_frontier.borrow(), upper.borrow(), Antichain::from_elem(G::Timestamp::minimum()).borrow());
updates = batches.into_iter().next().unwrap_or_default();
let batch = builder.done(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum()));
prev_frontier.clone_from(&upper);

// Communicate `batch` to the arrangement and the stream.
Expand Down
21 changes: 5 additions & 16 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ pub trait ReduceCore<G: Scope, K: ToOwned + ?Sized, V: Data, R: Semigroup> where
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Diff: Abelian,
T2::Batch: Batch,
T2::Builder: Builder<Input = Vec<((K::Owned, V), T2::Time, T2::Diff)>>,
T2::Builder: Builder<Input = ((K::Owned, V), T2::Time, T2::Diff)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static,
{
self.reduce_core::<_,_,T2>(name, from, move |key, input, output, change| {
Expand All @@ -274,7 +274,7 @@ pub trait ReduceCore<G: Scope, K: ToOwned + ?Sized, V: Data, R: Semigroup> where
T2: for<'a> Trace<Key<'a>=&'a K, Time=G::Timestamp>+'static,
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Batch: Batch,
T2::Builder: Builder<Input = Vec<((K::Owned, V), T2::Time, T2::Diff)>>,
T2::Builder: Builder<Input = ((K::Owned, V), T2::Time, T2::Diff)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
;
}
Expand All @@ -293,7 +293,7 @@ where
F: Fn(T2::Val<'_>) -> V + 'static,
T2: for<'a> Trace<Key<'a>=&'a K, Time=G::Timestamp>+'static,
T2::Batch: Batch,
T2::Builder: Builder<Input = Vec<((K, V), T2::Time, T2::Diff)>>,
T2::Builder: Builder<Input = ((K, V), T2::Time, T2::Diff)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
{
self.arrange_by_key_named(&format!("Arrange: {}", name))
Expand All @@ -312,7 +312,7 @@ where
V: Data,
F: Fn(T2::Val<'_>) -> V + 'static,
T2::Batch: Batch,
T2::Builder: Builder<Input = Vec<((T1::KeyOwned, V), T2::Time, T2::Diff)>>,
T2::Builder: Builder<Input = ((T1::KeyOwned, V), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
{
let mut result_trace = None;
Expand Down Expand Up @@ -529,20 +529,9 @@ where
// (ii) that the buffers are time-ordered, and (iii) that the builders accept
// arbitrarily ordered times.
for index in 0 .. buffers.len() {
// TODO: This doesn't reuse allocations for `update`.
let mut update = Vec::with_capacity(1024);
buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0));
for (val, time, diff) in buffers[index].1.drain(..) {
update.push(((key.into_owned(), val), time, diff));
if update.len() == update.capacity() {
let mut chain = vec![update];
builders[index].push_batches(&mut chain);
update = chain.pop().unwrap_or_else(|| Vec::with_capacity(1024));
update.clear();
}
}
if !update.is_empty() {
builders[index].push_batches(&mut vec![update]);
builders[index].push(((key.into_owned(), val), time, diff));
}
}
}
Expand Down
62 changes: 53 additions & 9 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use timely::logging_core::Logger;
use timely::progress::{frontier::Antichain, Timestamp};
use timely::progress::frontier::AntichainRef;

use crate::Data;
use crate::consolidation::consolidate_updates;
use crate::difference::Semigroup;
use crate::logging::{BatcherEvent, DifferentialEvent};
Expand Down Expand Up @@ -42,7 +43,7 @@ where
T: Timestamp,
{
type Input = M::Input;
type Output = M::Batch;
type Output = M::Output;
type Time = T;

fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
Expand Down Expand Up @@ -95,7 +96,7 @@ where

self.stash.clear();

let seal = B::from_batches(&mut readied, self.lower.borrow(), upper.borrow(), Antichain::from_elem(T::minimum()).borrow());
let seal = M::seal::<B>(&mut readied, self.lower.borrow(), upper.borrow(), Antichain::from_elem(T::minimum()).borrow());
self.lower = upper;
seal
}
Expand Down Expand Up @@ -185,6 +186,8 @@ pub trait Merger: Default {
type Input;
/// The internal representation of batches of data.
type Batch: Container;
/// The output type
type Output;
/// The type of time in frontiers to extract updates.
type Time;
/// Accept a fresh batch of input data.
Expand All @@ -196,6 +199,9 @@ pub trait Merger: Default {
/// Extract ready updates based on the `upper` frontier.
fn extract(&mut self, merged: Vec<Self::Batch>, upper: AntichainRef<Self::Time>, frontier: &mut Antichain<Self::Time>, readied: &mut Vec<Self::Batch>, keep: &mut Vec<Self::Batch>, stash: &mut Vec<Self::Batch>);

/// Build from a chain
fn seal<B: Builder<Input=Self::Output, Time=Self::Time>>(chain: &mut Vec<Self::Batch>, lower: AntichainRef<Self::Time>, upper: AntichainRef<Self::Time>, since: AntichainRef<Self::Time>) -> B::Output;

/// Account size and allocation changes. Returns a tuple of (records, size, capacity, allocations).
fn account(batch: &Self::Batch) -> (usize, usize, usize, usize);
}
Expand Down Expand Up @@ -241,12 +247,13 @@ impl<T> VecMerger<T> {
}
}

impl<D: Ord+Clone+'static, T: Ord + PartialOrder + Clone+'static,R: Semigroup+'static> Merger for VecMerger<(D,T,R)> {
impl<K: Data, V: Data, T: Ord + PartialOrder + Clone+'static,R: Semigroup+'static> Merger for VecMerger<((K,V),T,R)> {
type Time = T;
type Input = Vec<(D,T,R)>;
type Batch = Vec<(D,T,R)>;
type Input = Vec<((K,V),T,R)>;
type Batch = Vec<((K,V),T,R)>;
type Output = ((K,V), T, R);

fn accept(&mut self, batch: RefOrMut<Vec<(D,T,R)>>, stash: &mut Vec<Self::Batch>) -> Vec<Vec<(D, T, R)>> {
fn accept(&mut self, batch: RefOrMut<Self::Batch>, stash: &mut Vec<Self::Batch>) -> Vec<Self::Batch> {
// `batch` is either a shared reference or an owned allocations.
let mut owned = match batch {
RefOrMut::Ref(vec) => {
Expand All @@ -271,11 +278,11 @@ impl<D: Ord+Clone+'static, T: Ord + PartialOrder + Clone+'static,R: Semigroup+'s
}
}

fn finish(&mut self, _stash: &mut Vec<Vec<(D, T, R)>>) -> Vec<Vec<(D, T, R)>> {
fn finish(&mut self, _stash: &mut Vec<Self::Batch>) -> Vec<Self::Batch> {
vec![]
}

fn merge(&mut self, list1: Vec<Vec<(D, T, R)>>, list2: Vec<Vec<(D, T, R)>>, output: &mut Vec<Vec<(D, T, R)>>, stash: &mut Vec<Vec<(D, T, R)>>) {
fn merge(&mut self, list1: Vec<Self::Batch>, list2: Vec<Self::Batch>, output: &mut Vec<Self::Batch>, stash: &mut Vec<Self::Batch>) {
let mut list1 = list1.into_iter();
let mut list2 = list2.into_iter();
let mut head1 = VecDeque::from(list1.next().unwrap_or_default());
Expand Down Expand Up @@ -343,7 +350,7 @@ impl<D: Ord+Clone+'static, T: Ord + PartialOrder + Clone+'static,R: Semigroup+'s
output.extend(list2);
}

fn extract(&mut self, merged: Vec<Vec<(D, T, R)>>, upper: AntichainRef<Self::Time>, frontier: &mut Antichain<Self::Time>, readied: &mut Vec<Vec<(D, T, R)>>, kept: &mut Vec<Vec<(D, T, R)>>, stash: &mut Vec<Vec<(D, T, R)>>) {
fn extract(&mut self, merged: Vec<Self::Batch>, upper: AntichainRef<Self::Time>, frontier: &mut Antichain<Self::Time>, readied: &mut Vec<Self::Batch>, kept: &mut Vec<Self::Batch>, stash: &mut Vec<Self::Batch>) {
let mut keep = self.empty(stash);
let mut ready = self.empty(stash);

Expand Down Expand Up @@ -377,6 +384,43 @@ impl<D: Ord+Clone+'static, T: Ord + PartialOrder + Clone+'static,R: Semigroup+'s
}
}

fn seal<B: Builder<Input=Self::Output, Time=Self::Time>>(chain: &mut Vec<Self::Batch>, lower: AntichainRef<Self::Time>, upper: AntichainRef<Self::Time>, since: AntichainRef<Self::Time>) -> B::Output {
let mut builder = {
let mut keys = 0;
let mut vals = 0;
let mut upds = 0;
let mut prev_keyval = None;
for buffer in chain.iter() {
for ((key, val), time, _) in buffer.iter() {
if !upper.less_equal(time) {
if let Some((p_key, p_val)) = prev_keyval {
if p_key != key {
keys += 1;
vals += 1;
}
else if p_val != val {
vals += 1;
}
upds += 1;
} else {
keys += 1;
vals += 1;
upds += 1;
}
prev_keyval = Some((key, val));
}
}
}
B::with_capacity(keys, vals, upds)
};

for datum in chain.drain(..).flatten() {
builder.push(datum);
}

builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())
}

fn account(batch: &Self::Batch) -> (usize, usize, usize, usize) {
(batch.len(), 0, 0, 0)
}
Expand Down
Loading

0 comments on commit b07aa9e

Please sign in to comment.