Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT]: sql cross join #3110

Merged
merged 11 commits into from
Oct 28, 2024
Merged

Conversation

universalmind303
Copy link
Contributor

@universalmind303 universalmind303 commented Oct 23, 2024

still todo:

  • add tests

Notes for reviewers:

This does not actually implement a physical cross join, but just implements the logical cross join as well as cross join to inner join optimization eliminate_cross_join.rs

This treats an inner join with no join conditions as cross join. (inspired by a recent change in datafusion).

If the cross join can not be optimized away, an error will be raised when attempting to execute the plan.

@github-actions github-actions bot added the enhancement New feature or request label Oct 23, 2024
Copy link

codspeed-hq bot commented Oct 23, 2024

CodSpeed Performance Report

Merging #3110 will not alter performance

Comparing universalmind303:cross-join (3dcafcb) with main (5228930)

Summary

✅ 17 untouched benchmarks

@universalmind303 universalmind303 marked this pull request as ready for review October 24, 2024 17:46
@universalmind303 universalmind303 linked an issue Oct 24, 2024 that may be closed by this pull request
Copy link
Member

@kevinzwang kevinzwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this Cory! The logic largely looks sound but I'm not super happy about the quality of the DataFusion optimizer rule code. I had a good amount of comments but I'm happy to branch off of your changes and make the fixes myself.

Comment on lines +117 to +122
fn rewrite_children(
optimizer: &impl OptimizerRule,
plan: Arc<LogicalPlan>,
) -> DaftResult<Transformed<Arc<LogicalPlan>>> {
plan.map_children(|input| optimizer.try_optimize(input))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of calling rewrite_children to recurse down the plan tree, you can use TreeNode::transform_up or TreeNode::transform_down which will do it for you. Then, in try_optimize we can match on the specific cases that we'd like this rule to apply to, which makes it more clear what plan nodes this rule affects.

Comment on lines 129 to 148
match plan {
LogicalPlan::Join(join) if join.join_type == JoinType::Inner => {
let keys = join.left_on.into_iter().zip(join.right_on);
possible_join_keys.insert_all_owned(keys);
flatten_join_inputs(
Arc::unwrap_or_clone(join.left),
possible_join_keys,
all_inputs,
)?;
flatten_join_inputs(
Arc::unwrap_or_clone(join.right),
possible_join_keys,
all_inputs,
)?;
}
_ => {
all_inputs.push(Arc::new(plan));
}
};
Ok(())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can also use TreeNode methods to simplify code here

rights: &mut Vec<LogicalPlanRef>,
possible_join_keys: &JoinKeySet,
all_join_keys: &mut JoinKeySet,
) -> DaftResult<LogicalPlanRef> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note, I don't believe it affects TPC-H, but I believe this function is actually somewhat brittle, since if a join key uses columns from more than two tables, it will not always optimize it, depending on the order in which it has seen the tables.

Consider this case:

filter
   |
cross join
  /  \
a  cross join
    /   \
   b     c

If the filter predicate was a.id + c.id = b.id, then this optimizer rule would first look for a table with columns a.id and c.id and another one with b.id. Not finding one, it would cross join a and b in the first call to find_inner_join, and then cross join that with c in the second call. However, it could have cross joined a and c and then inner joined that with b.

If we wanted to only handle two table predicates, I believe my suggestion in Slack to do filter pushdown + simple single join node into filter optimization would also solve that.

all_inputs,
)?;
}
_ => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We insert projects under joins when there are naming conflicts to rename columns, however we could possibly still flatten joins that have those (and other compute-less) projects between them.

Comment on lines +151 to +176
/// Returns true if the plan is a Join or Cross join could be flattened with
/// `flatten_join_inputs`
///
/// Must stay in sync with `flatten_join_inputs`
fn can_flatten_join_inputs(plan: &LogicalPlan) -> bool {
// can only flatten inner / cross joins
match plan {
LogicalPlan::Join(join) if join.join_type == JoinType::Inner => {}
_ => return false,
};

for child in plan.children() {
if matches!(
child,
LogicalPlan::Join(Join {
join_strategy: None,
join_type: JoinType::Inner,
..
})
) && !can_flatten_join_inputs(child)
{
return false;
}
}
true
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstrings for this function are not very clear, and based on the logic it basically just checks if the top level plan node is an inner join? Unless the match on the plan is not intended to check for join strategy, the matches! for each child is redundant to the recursive call.

Comment on lines +180 to +202
if let Expr::BinaryOp { left, op, right } = expr {
match op {
Operator::Eq => {
// insert handles ensuring we don't add the same Join keys multiple times
join_keys.insert(left, right);
}
Operator::And => {
extract_possible_join_keys(left, join_keys);
extract_possible_join_keys(right, join_keys);
}
// Fix for join predicates from inside of OR expr also pulled up properly.
Operator::Or => {
let mut left_join_keys = JoinKeySet::new();
let mut right_join_keys = JoinKeySet::new();

extract_possible_join_keys(left, &mut left_join_keys);
extract_possible_join_keys(right, &mut right_join_keys);

join_keys.insert_intersection(&left_join_keys, &right_join_keys);
}
_ => (),
};
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's similar logic to this in other parts of the code already, I believe the only difference is this does not pull out common expressions from two sides of an or, but if we want that, we should probably add it to the same place so other optimizer rules can take advantage as well.

pub fn split_conjuction(expr: &ExprRef) -> Vec<&ExprRef> {

Comment on lines +399 to +432
pub fn can_hash(data_type: &DataType) -> bool {
match data_type {
DataType::Null => true,
DataType::Boolean => true,
DataType::Int8 => true,
DataType::Int16 => true,
DataType::Int32 => true,
DataType::Int64 => true,
DataType::UInt8 => true,
DataType::UInt16 => true,
DataType::UInt32 => true,
DataType::UInt64 => true,
DataType::Float32 => true,
DataType::Float64 => true,
DataType::Timestamp(time_unit, _) => match time_unit {
TimeUnit::Seconds => true,
TimeUnit::Milliseconds => true,
TimeUnit::Microseconds => true,
TimeUnit::Nanoseconds => true,
},
DataType::Utf8 => true,

DataType::Decimal128(_, _) => true,
DataType::Date => true,

DataType::FixedSizeBinary(_) => true,

DataType::List(_) => true,

DataType::FixedSizeList(_, _) => true,
DataType::Struct(fields) => fields.iter().all(|f| can_hash(&f.dtype)),
_ => false,
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, I wonder what the behavior right now is if you specify a join key that is not hashable. Additionally, should this be a method for DataType instead?

@universalmind303 universalmind303 merged commit 1deeaf9 into Eventual-Inc:main Oct 28, 2024
38 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Optimizer: Eliminate Cross Join Rule
2 participants