Skip to content

Allow the output of a join to be filtered by an arbitrary predicate #37

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 36 additions & 13 deletions src/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,50 @@ pub(crate) fn join_into<'me, Key: Ord, Val1: Ord, Val2: Ord, Result: Ord>(
mut logic: impl FnMut(&Key, &Val1, &Val2) -> Result,
) {
let mut results = Vec::new();
let push_result = |k: &Key, v1: &Val1, v2: &Val2| results.push(logic(k, v1, v2));

let recent1 = input1.recent();
let recent2 = input2.recent();

{
// scoped to let `closure` drop borrow of `results`.
join_delta(input1, input2, push_result);

let mut closure = |k: &Key, v1: &Val1, v2: &Val2| results.push(logic(k, v1, v2));
output.insert(Relation::from_vec(results));
}

for batch2 in input2.stable().iter() {
join_helper(&recent1, &batch2, &mut closure);
pub(crate) fn join_and_filter_into<'me, Key: Ord, Val1: Ord, Val2: Ord, Result: Ord>(
input1: &Variable<(Key, Val1)>,
input2: impl JoinInput<'me, (Key, Val2)>,
output: &Variable<Result>,
mut logic: impl FnMut(&Key, &Val1, &Val2) -> Option<Result>,
) {
let mut results = Vec::new();
let push_result = |k: &Key, v1: &Val1, v2: &Val2| {
if let Some(result) = logic(k, v1, v2) {
results.push(result);
}
};

for batch1 in input1.stable().iter() {
join_helper(&batch1, &recent2, &mut closure);
}
join_delta(input1, input2, push_result);

join_helper(&recent1, &recent2, &mut closure);
output.insert(Relation::from_vec(results));
}

/// Joins the `recent` tuples of each input with the `stable` tuples of the other, then the
/// `recent` tuples of *both* inputs.
fn join_delta<'me, Key: Ord, Val1: Ord, Val2: Ord>(
input1: &Variable<(Key, Val1)>,
input2: impl JoinInput<'me, (Key, Val2)>,
mut result: impl FnMut(&Key, &Val1, &Val2),
) {
let recent1 = input1.recent();
let recent2 = input2.recent();

for batch2 in input2.stable().iter() {
join_helper(&recent1, &batch2, &mut result);
}

output.insert(Relation::from_vec(results));
for batch1 in input1.stable().iter() {
join_helper(&batch1, &recent2, &mut result);
}

join_helper(&recent1, &recent2, &mut result);
}

/// Join, but for two relations.
Expand Down
44 changes: 44 additions & 0 deletions src/variable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,50 @@ impl<Tuple: Ord> Variable<Tuple> {
join::join_into(input1, input2, self, logic)
}

/// Same as [`Variable::from_join`], but lets you ignore some of the resulting tuples.
///
/// # Examples
///
/// This is the same example from `Variable::from_join`, but it filters any tuples where the
/// absolute difference is greater than 3. As a result, it generates all pairs (x, y) for x and
/// y in 0 .. 11 such that |x - y| <= 3.
///
/// ```
/// use datafrog::{Iteration, Relation};
///
/// let mut iteration = Iteration::new();
/// let variable = iteration.variable::<(isize, isize)>("source");
/// variable.extend((0 .. 10).map(|x| (x, x + 1)));
/// variable.extend((0 .. 10).map(|x| (x + 1, x)));
///
/// while iteration.changed() {
/// variable.from_join_filtered(&variable, &variable, |&key, &val1, &val2| {
/// ((val1 - val2).abs() <= 3).then(|| (val1, val2))
/// });
/// }
///
/// let result = variable.complete();
///
/// let mut expected_cnt = 0;
/// for i in 0i32..11 {
/// for j in 0i32..11 {
/// if (i - j).abs() <= 3 {
/// expected_cnt += 1;
/// }
/// }
/// }
///
/// assert_eq!(result.len(), expected_cnt);
/// ```
pub fn from_join_filtered<'me, K: Ord, V1: Ord, V2: Ord>(
&self,
input1: &'me Variable<(K, V1)>,
input2: impl JoinInput<'me, (K, V2)>,
logic: impl FnMut(&K, &V1, &V2) -> Option<Tuple>,
) {
join::join_and_filter_into(input1, input2, self, logic)
}

/// Adds tuples from `input1` whose key is not present in `input2`.
///
/// Note that `input1` must be a variable: if you have a relation
Expand Down