Skip to content
123 changes: 119 additions & 4 deletions datafusion/optimizer/src/extract_equijoin_predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::tree_node::Transformed;
use datafusion_common::DFSchema;
use datafusion_common::Result;
use datafusion_common::{internal_err, DFSchema};
use datafusion_common::{NullEquality, Result};
use datafusion_expr::utils::split_conjunction_owned;
use datafusion_expr::utils::{can_hash, find_valid_equijoin_key_pair};
use datafusion_expr::{BinaryExpr, Expr, ExprSchemable, Join, LogicalPlan, Operator};
Expand Down Expand Up @@ -82,6 +82,45 @@ impl OptimizerRule for ExtractEquijoinPredicate {
let (equijoin_predicates, non_equijoin_expr) =
split_eq_and_noneq_join_predicate(expr, left_schema, right_schema)?;

// Equi-join operators like HashJoin support a special behavior
// that evaluates `NULL = NULL` as true instead of NULL. Therefore,
// we transform `t1.c1 IS NOT DISTINCT FROM t2.c1` into an equi-join
// and set the `NullEquality` configuration in the join operator.
// This allows certain queries to use Hash Join instead of
// Nested Loop Join, resulting in better performance.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice comment

//
// Only convert when there are NO equijoin predicates, to be conservative.
if on.is_empty()
&& equijoin_predicates.is_empty()
&& non_equijoin_expr.is_some()
{
// SAFETY: checked in the outer `if`
let expr = non_equijoin_expr.clone().unwrap();
let (equijoin_predicates, non_equijoin_expr) =
split_is_not_distinct_from_and_other_join_predicate(
expr,
left_schema,
right_schema,
)?;

if !equijoin_predicates.is_empty() {
on.extend(equijoin_predicates);

return Ok(Transformed::yes(LogicalPlan::Join(Join {
left,
right,
on,
filter: non_equijoin_expr,
join_type,
join_constraint,
schema,
// According to `is not distinct from`'s semantics, it's
// safe to override it
null_equality: NullEquality::NullEqualsNull,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

})));
}
}

if !equijoin_predicates.is_empty() {
on.extend(equijoin_predicates);
Ok(Transformed::yes(LogicalPlan::Join(Join {
Expand Down Expand Up @@ -112,22 +151,98 @@ impl OptimizerRule for ExtractEquijoinPredicate {
}
}

/// Splits an ANDed filter expression into equijoin predicates and remaining filters.
/// Returns all equijoin predicates and the remaining filters combined with AND.
///
/// # Example
///
/// For the expression `a.id = b.id AND a.x > 10 AND b.x > b.id`, this function will extract `a.id = b.id` as an equijoin predicate.
///
/// It first splits the ANDed sub-expressions:
/// - expr1: a.id = b.id
/// - expr2: a.x > 10
/// - expr3: b.x > b.id
///
/// Then, it filters out the equijoin predicates and collects the non-equality expressions.
/// The equijoin condition is:
/// - It is an equality expression like `lhs == rhs`
/// - All column references in `lhs` are from the left schema, and all in `rhs` are from the right schema
///
/// According to the above rule, `expr1` is the equijoin predicate, while `expr2` and `expr3` are not.
/// The function returns Ok(\[expr1\], Some(expr2 AND expr3))
fn split_eq_and_noneq_join_predicate(
filter: Expr,
left_schema: &DFSchema,
right_schema: &DFSchema,
) -> Result<(Vec<EquijoinPredicate>, Option<Expr>)> {
split_op_and_other_join_predicates(filter, left_schema, right_schema, Operator::Eq)
}

/// See `split_eq_and_noneq_join_predicate`'s comment for the idea. This function
/// is splitting out `is not distinct from` expressions instead of equal exprs.
/// The `is not distinct from` exprs will be return as `EquijoinPredicate`.
///
/// # Example
/// - Input: `a.id IS NOT DISTINCT FROM b.id AND a.x > 10 AND b.x > b.id`
/// - Output from this splitter: `Ok([a.id, b.id], Some((a.x > 10) AND (b.x > b.id)))`
///
/// # Note
/// Caller should be cautious -- `is not distinct from` is not equivalent to an
/// equal expression; the caller is responsible for correctly setting the
/// `nulls equals nulls` property in the join operator (if it supports it) to
/// make the transformation valid.
///
/// For the above example: in downstream, a valid plan that uses the extracted
/// equijoin keys should look like:
///
/// HashJoin
/// - on: `a.id = b.id` (equality)
/// - join_filter: `(a.x > 10) AND (b.x > b.id)`
/// - nulls_equals_null: `true`
///
/// This reflects that `IS NOT DISTINCT FROM` treats `NULL = NULL` as true and
/// thus requires setting `NullEquality::NullEqualsNull` in the join operator to
/// preserve semantics while enabling an equi-join implementation (e.g., HashJoin).
fn split_is_not_distinct_from_and_other_join_predicate(
filter: Expr,
left_schema: &DFSchema,
right_schema: &DFSchema,
) -> Result<(Vec<EquijoinPredicate>, Option<Expr>)> {
split_op_and_other_join_predicates(
filter,
left_schema,
right_schema,
Operator::IsNotDistinctFrom,
)
}

/// See comments in `split_eq_and_noneq_join_predicate` for details.
fn split_op_and_other_join_predicates(
filter: Expr,
left_schema: &DFSchema,
right_schema: &DFSchema,
operator: Operator,
) -> Result<(Vec<EquijoinPredicate>, Option<Expr>)> {
if !matches!(operator, Operator::Eq | Operator::IsNotDistinctFrom) {
return internal_err!(
"split_op_and_other_join_predicates only supports 'Eq' or 'IsNotDistinctFrom' operators, \
but received: {:?}",
operator
);
}

let exprs = split_conjunction_owned(filter);

// Treat 'is not distinct from' comparison as join key in equal joins
let mut accum_join_keys: Vec<(Expr, Expr)> = vec![];
let mut accum_filters: Vec<Expr> = vec![];
for expr in exprs {
match expr {
Expr::BinaryExpr(BinaryExpr {
ref left,
op: Operator::Eq,
ref op,
ref right,
}) => {
}) if *op == operator => {
let join_key_pair =
find_valid_equijoin_key_pair(left, right, left_schema, right_schema)?;

Expand Down
Loading