Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

join, half_join: add lower-level interfaces #327

Merged
merged 6 commits into from
Aug 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 63 additions & 11 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,23 @@ use differential_dataflow::consolidation::{consolidate, consolidate_updates};
/// ((key, val1, time1), initial_time, diff1)
/// ```
///
/// where `initial_time` is less or equal to `time`, and produces as output
/// where `initial_time` is less or equal to `time1`, and produces as output
///
/// ```ignore
/// ((key, (val1, val2), lub(time1, time2)), initial_time, diff1 * diff2)
/// ((output_func(key, val1, val2), lub(time1, time2)), initial_time, diff1 * diff2)
/// ```
///
/// for each `((key, val2), time2, diff2)` present in `arrangement, where
/// for each `((key, val2), time2, diff2)` present in `arrangement`, where
/// `time2` is less than `initial_time` *UNDER THE TOTAL ORDER ON TIMES*.
/// This last constraint is important to ensure that we correctly produce
/// all pairs of output updates across multiple `half_join` operators.
///
/// Notice that the time is hoisted up into data. The expectation is that
/// once out of the dataflow, the updates will be `delay`d to the times
/// specified in the payloads.
/// once out of the "delta flow region", the updates will be `delay`d to the
/// times specified in the payloads.
pub fn half_join<G, V, Tr, FF, CF, DOut, S>(
stream: &Collection<G, (Tr::Key, V, G::Timestamp), Tr::R>,
mut arrangement: Arranged<G, Tr>,
arrangement: Arranged<G, Tr>,
frontier_func: FF,
comparison: CF,
mut output_func: S,
Expand All @@ -89,6 +89,57 @@ where
DOut: Clone+'static,
Tr::R: std::ops::Mul<Tr::R, Output=Tr::R>,
S: FnMut(&Tr::Key, &V, &Tr::Val)->DOut+'static,
{
let output_func = move |k: &Tr::Key, v1: &V, v2: &Tr::Val, initial: &G::Timestamp, time: &G::Timestamp, diff1: &Tr::R, diff2: &Tr::R| {
let diff = diff1.clone() * diff2.clone();
let dout = (output_func(k, v1, v2), time.clone());
Some((dout, initial.clone(), diff))
};
half_join_internal_unsafe(stream, arrangement, frontier_func, comparison, output_func)
}

/// An unsafe variant of `half_join` where the `output_func` closure takes
/// additional arguments for `time` and `diff` as input and returns an iterator
/// over `(data, time, diff)` triplets. This allows for more flexibility, but
/// is more error-prone.
///
/// This operator responds to inputs of the form
///
/// ```ignore
/// ((key, val1, time1), initial_time, diff1)
/// ```
///
/// where `initial_time` is less or equal to `time1`, and produces as output
///
/// ```ignore
/// output_func(key, val1, val2, initial_time, lub(time1, time2), diff1, diff2)
/// ```
///
/// for each `((key, val2), time2, diff2)` present in `arrangement`, where
/// `time2` is less than `initial_time` *UNDER THE TOTAL ORDER ON TIMES*.
pub fn half_join_internal_unsafe<G, V, Tr, FF, CF, DOut, ROut, I, S>(
stream: &Collection<G, (Tr::Key, V, G::Timestamp), Tr::R>,
mut arrangement: Arranged<G, Tr>,
frontier_func: FF,
comparison: CF,
mut output_func: S,
) -> Collection<G, DOut, ROut>
where
G: Scope,
G::Timestamp: Lattice,
V: ExchangeData,
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
Tr::Key: Ord+Hashable+ExchangeData,
Tr::Val: Clone,
Tr::Batch: BatchReader<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
Tr::R: Monoid+ExchangeData,
FF: Fn(&G::Timestamp) -> G::Timestamp + 'static,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
DOut: Clone+'static,
ROut: Monoid,
I: IntoIterator<Item=(DOut, G::Timestamp, ROut)>,
S: FnMut(&Tr::Key, &V, &Tr::Val, &G::Timestamp, &G::Timestamp, &Tr::R, &Tr::R)-> I + 'static,
{
// No need to block physical merging for this operator.
arrangement.trace.set_physical_compaction(Antichain::new().borrow());
Expand Down Expand Up @@ -132,7 +183,7 @@ where

let (mut cursor, storage) = trace.cursor();

for &mut ((ref key, ref val1, ref time), ref initial, ref mut diff) in proposals.iter_mut() {
for &mut ((ref key, ref val1, ref time), ref initial, ref mut diff1) in proposals.iter_mut() {
// Use TOTAL ORDER to allow the release of `time`.
if !input2.frontier.frontier().iter().any(|t| comparison(t, initial)) {
cursor.seek_key(&storage, &key);
Expand All @@ -144,15 +195,16 @@ where
}
});
consolidate(&mut output_buffer);
for (time, count) in output_buffer.drain(..) {
let dout = output_func(key, val1, val2);
session.give(((dout, time), initial.clone(), count * diff.clone()));
for (time, diff2) in output_buffer.drain(..) {
for dout in output_func(key, val1, val2, initial, &time, &diff1, &diff2) {
session.give(dout);
}
}
cursor.step_val(&storage);
}
cursor.rewind_vals(&storage);
}
*diff = Tr::R::zero();
*diff1 = Tr::R::zero();
}
}

Expand Down
119 changes: 105 additions & 14 deletions src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ pub trait Join<G: Scope, K: Data, V: Data, R: Semigroup> {
/// ```
fn semijoin<R2>(&self, other: &Collection<G, K, R2>) -> Collection<G, (K, V), <R as Multiply<R2>>::Output>
where K: ExchangeData, R2: ExchangeData+Semigroup, R: Multiply<R2>, <R as Multiply<R2>>::Output: Semigroup;

/// Subtracts the semijoin with `other` from `self`.
///
/// In the case that `other` has multiplicities zero or one this results
Expand Down Expand Up @@ -217,10 +218,12 @@ where
/// directly in the event that one has a handle to an `Arranged<G,T>`, perhaps because
/// the arrangement is available for re-use, or from the output of a `group` operator.
pub trait JoinCore<G: Scope, K: 'static, V: 'static, R: Semigroup> where G::Timestamp: Lattice+Ord {

/// Joins two arranged collections with the same key type.
///
/// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
/// which produces something implementing `IntoIterator`, where the output collection will have
/// which produces something implementing `IntoIterator`, where the output collection will have an entry for
/// every value returned by the iterator.
///
/// This trait is implemented for arrangements (`Arranged<G, T>`) rather than collections. The `Join` trait
/// contains the implementations for collections.
Expand Down Expand Up @@ -265,6 +268,58 @@ pub trait JoinCore<G: Scope, K: 'static, V: 'static, R: Semigroup> where G::Time
I::Item: Data,
L: FnMut(&K,&V,&Tr2::Val)->I+'static,
;

/// An unsafe variant of `join_core` where the `result` closure takes additional arguments for `time` and
/// `diff` as input and returns an iterator over `(data, time, diff)` triplets. This allows for more
/// flexibility, but is more error-prone.
///
/// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
/// which produces something implementing `IntoIterator`, where the output collection will have an entry
/// for every value returned by the iterator.
///
/// This trait is implemented for arrangements (`Arranged<G, T>`) rather than collections. The `Join` trait
/// contains the implementations for collections.
///
/// # Examples
///
/// ```
/// extern crate timely;
/// extern crate differential_dataflow;
///
/// use differential_dataflow::input::Input;
/// use differential_dataflow::operators::arrange::ArrangeByKey;
/// use differential_dataflow::operators::join::JoinCore;
/// use differential_dataflow::trace::Trace;
/// use differential_dataflow::trace::implementations::ord::OrdValSpine;
///
/// fn main() {
/// ::timely::example(|scope| {
///
/// let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1
/// .arrange_by_key();
/// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1
/// .arrange_by_key();
///
/// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b'), (3, 'b'), (3, 'b')]).1;
///
/// // Returned values have weight `a`
/// x.join_core_internal_unsafe(&y, |_key, &a, &b, &t, &r1, &r2| Some(((a, b), t.clone(), a)))
/// .assert_eq(&z);
/// });
/// }
/// ```
fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,D,ROut>
where
Tr2: TraceReader<Key=K, Time=G::Timestamp>+Clone+'static,
Tr2::Batch: BatchReader<K, Tr2::Val, G::Timestamp, Tr2::R>+'static,
Tr2::Cursor: Cursor<K, Tr2::Val, G::Timestamp, Tr2::R>+'static,
Tr2::Val: Ord+Clone+Debug+'static,
Tr2::R: Semigroup,
D: Data,
ROut: Semigroup,
I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
L: FnMut(&K,&V,&Tr2::Val,&G::Timestamp,&R,&Tr2::R)->I+'static,
;
}


Expand Down Expand Up @@ -292,6 +347,23 @@ where
self.arrange_by_key()
.join_core(stream2, result)
}

fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,D,ROut>
where
Tr2: TraceReader<Key=K, Time=G::Timestamp>+Clone+'static,
Tr2::Batch: BatchReader<K, Tr2::Val, G::Timestamp, Tr2::R>+'static,
Tr2::Cursor: Cursor<K, Tr2::Val, G::Timestamp, Tr2::R>+'static,
Tr2::Val: Ord+Clone+Debug+'static,
Tr2::R: Semigroup,
R: Semigroup,
D: Data,
ROut: Semigroup,
I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
L: FnMut(&K,&V,&Tr2::Val,&G::Timestamp,&R,&Tr2::R)->I+'static,
{
self.arrange_by_key().join_core_internal_unsafe(stream2, result)
}

}

impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G,T1>
Expand All @@ -316,8 +388,28 @@ impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G,T1>
<T1::R as Multiply<Tr2::R>>::Output: Semigroup,
I: IntoIterator,
I::Item: Data,
L: FnMut(&T1::Key,&T1::Val,&Tr2::Val)->I+'static {
L: FnMut(&T1::Key,&T1::Val,&Tr2::Val)->I+'static
{
let result = move |k: &T1::Key, v1: &T1::Val, v2: &Tr2::Val, t: &G::Timestamp, r1: &T1::R, r2: &Tr2::R| {
let t = t.clone();
let r = (r1.clone()).multiply(r2);
result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone()))
};
self.join_core_internal_unsafe(other, result)
}

fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, other: &Arranged<G,Tr2>, mut result: L) -> Collection<G,D,ROut>
where
Tr2: TraceReader<Key=T1::Key, Time=G::Timestamp>+Clone+'static,
Tr2::Batch: BatchReader<T1::Key, Tr2::Val, G::Timestamp, Tr2::R>+'static,
Tr2::Cursor: Cursor<T1::Key, Tr2::Val, G::Timestamp, Tr2::R>+'static,
Tr2::Val: Ord+Clone+Debug+'static,
Tr2::R: Semigroup,
D: Data,
ROut: Semigroup,
I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
L: FnMut(&T1::Key,&T1::Val,&Tr2::Val,&G::Timestamp,&T1::R,&Tr2::R)->I+'static,
{
// Rename traces for symmetry from here on out.
let mut trace1 = self.trace.clone();
let mut trace2 = other.trace.clone();
Expand Down Expand Up @@ -488,8 +580,7 @@ impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G,T1>
while !todo1.is_empty() && fuel > 0 {
todo1.front_mut().unwrap().work(
output,
|k,v2,v1| result(k,v1,v2),
|r2,r1| (r1.clone()).multiply(r2),
|k,v2,v1,t,r2,r1| result(k,v1,v2,t,r1,r2),
&mut fuel
);
if !todo1.front().unwrap().work_remains() { todo1.pop_front(); }
Expand All @@ -500,8 +591,7 @@ impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G,T1>
while !todo2.is_empty() && fuel > 0 {
todo2.front_mut().unwrap().work(
output,
|k,v1,v2| result(k,v1,v2),
|r1,r2| (r1.clone()).multiply(r2),
|k,v1,v2,t,r1,r2| result(k,v1,v2,t,r1,r2),
&mut fuel
);
if !todo2.front().unwrap().work_remains() { todo2.pop_front(); }
Expand Down Expand Up @@ -594,7 +684,7 @@ where
R3: Semigroup,
C1: Cursor<K, V1, T, R1>,
C2: Cursor<K, V2, T, R2>,
D: Ord+Clone+Data,
D: Clone+Data,
{
fn new(trace: C1, trace_storage: C1::Storage, batch: C2, batch_storage: C2::Storage, capability: Capability<T>) -> Self {
Deferred {
Expand All @@ -613,10 +703,10 @@ where
!self.done
}

/// Process keys until at least `limit` output tuples produced, or the work is exhausted.
/// Process keys until at least `fuel` output tuples produced, or the work is exhausted.
#[inline(never)]
fn work<L, M, I>(&mut self, output: &mut OutputHandle<T, (D, T, R3), Tee<T, (D, T, R3)>>, mut logic: L, mut mult: M, fuel: &mut usize)
where I: IntoIterator<Item=D>, L: FnMut(&K, &V1, &V2)->I, M: FnMut(&R1,&R2)->R3 {
fn work<L, I>(&mut self, output: &mut OutputHandle<T, (D, T, R3), Tee<T, (D, T, R3)>>, mut logic: L, fuel: &mut usize)
where I: IntoIterator<Item=(D, T, R3)>, L: FnMut(&K, &V1, &V2, &T, &R1, &R2)->I {

let meet = self.capability.time();

Expand Down Expand Up @@ -645,11 +735,12 @@ where
assert_eq!(temp.len(), 0);

// populate `temp` with the results in the best way we know how.
thinker.think(|v1,v2,t,r1,r2|
for result in logic(batch.key(batch_storage), v1, v2) {
temp.push(((result, t.clone()), mult(r1, r2)));
thinker.think(|v1,v2,t,r1,r2| {
let key = batch.key(batch_storage);
for (d, t, r) in logic(key, v1, v2, &t, r1, r2) {
temp.push(((d, t), r));
}
);
});

// TODO: This consolidation is optional, and it may not be very
// helpful. We might try harder to understand whether we
Expand Down