Skip to content

Generalize pattern for planning hash joins. #418

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
import org.apache.spark.sql.catalyst.types.{BooleanType, StringType, TimestampType}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.types.BooleanType


object InterpretedPredicate {
def apply(expression: Expression): (Row => Boolean) = {
Expand All @@ -37,10 +38,26 @@ trait Predicate extends Expression {
}

trait PredicateHelper {
def splitConjunctivePredicates(condition: Expression): Seq[Expression] = condition match {
case And(cond1, cond2) => splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)
case other => other :: Nil
protected def splitConjunctivePredicates(condition: Expression): Seq[Expression] = {
condition match {
case And(cond1, cond2) =>
splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)
case other => other :: Nil
}
}

/**
* Returns true if `expr` can be evaluated using only the output of `plan`. This method
* can be used to determine when is is acceptable to move expression evaluation within a query
* plan.
*
* For example consider a join between two relations R(a, b) and S(c, d).
*
* `canEvaluate(Equals(a,b), R)` returns `true` where as `canEvaluate(Equals(a,c), R)` returns
* `false`.
*/
protected def canEvaluate(expr: Expression, plan: LogicalPlan): Boolean =
expr.references.subsetOf(plan.outputSet)
}

abstract class BinaryPredicate extends BinaryExpression with Predicate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ package org.apache.spark.sql.catalyst.planning

import scala.annotation.tailrec

import org.apache.spark.sql.Logging

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._

/**
Expand Down Expand Up @@ -101,6 +104,55 @@ object PhysicalOperation extends PredicateHelper {
}
}

/**
* A pattern that finds joins with equality conditions that can be evaluated using hashing
* techniques. For inner joins, any filters on top of the join operator are also matched.
*/
object HashFilteredJoin extends Logging with PredicateHelper {
/** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */
type ReturnType =
(JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan)

def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
// All predicates can be evaluated for inner join (i.e., those that are in the ON
// clause and WHERE clause.)
case FilteredOperation(predicates, join @ Join(left, right, Inner, condition)) =>
logger.debug(s"Considering hash inner join on: ${predicates ++ condition}")
splitPredicates(predicates ++ condition, join)
case join @ Join(left, right, joinType, condition) =>
logger.debug(s"Considering hash join on: $condition")
splitPredicates(condition.toSeq, join)
case _ => None
}

// Find equi-join predicates that can be evaluated before the join, and thus can be used
// as join keys.
def splitPredicates(allPredicates: Seq[Expression], join: Join): Option[ReturnType] = {
val Join(left, right, joinType, _) = join
val (joinPredicates, otherPredicates) = allPredicates.partition {
case Equals(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
(canEvaluate(l, right) && canEvaluate(r, left)) => true
case _ => false
}

val joinKeys = joinPredicates.map {
case Equals(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r)
case Equals(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l)
}

// Do not consider this strategy if there are no join keys.
if (joinKeys.nonEmpty) {
val leftKeys = joinKeys.map(_._1)
val rightKeys = joinKeys.map(_._2)

Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
} else {
logger.debug(s"Avoiding hash join with no join keys.")
None
}
}
}

/**
* A pattern that collects all adjacent unions and returns their children as a Seq.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,51 +28,16 @@ import org.apache.spark.sql.parquet._
abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
self: SQLContext#SparkPlanner =>

object HashJoin extends Strategy {
object HashJoin extends Strategy with PredicateHelper {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case FilteredOperation(predicates, logical.Join(left, right, Inner, condition)) =>
logger.debug(s"Considering join: ${predicates ++ condition}")
// Find equi-join predicates that can be evaluated before the join, and thus can be used
// as join keys. Note we can only mix in the conditions with other predicates because the
// match above ensures that this is and Inner join.
val (joinPredicates, otherPredicates) = (predicates ++ condition).partition {
case Equals(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
(canEvaluate(l, right) && canEvaluate(r, left)) => true
case _ => false
}

val joinKeys = joinPredicates.map {
case Equals(l,r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r)
case Equals(l,r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l)
}

// Do not consider this strategy if there are no join keys.
if (joinKeys.nonEmpty) {
val leftKeys = joinKeys.map(_._1)
val rightKeys = joinKeys.map(_._2)

val joinOp = execution.HashJoin(
leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))

// Make sure other conditions are met if present.
if (otherPredicates.nonEmpty) {
execution.Filter(combineConjunctivePredicates(otherPredicates), joinOp) :: Nil
} else {
joinOp :: Nil
}
} else {
logger.debug(s"Avoiding spark join with no join keys.")
Nil
}
// Find inner joins where at least some predicates can be evaluated by matching hash keys
// using the HashFilteredJoin pattern.
case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add some comment explaining what's happening here...

val hashJoin =
execution.HashJoin(leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))
condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil
case _ => Nil
}

private def combineConjunctivePredicates(predicates: Seq[Expression]) =
predicates.reduceLeft(And)

/** Returns true if `expr` can be evaluated using only the output of `plan`. */
protected def canEvaluate(expr: Expression, plan: LogicalPlan): Boolean =
expr.references subsetOf plan.outputSet
}

object PartialAggregation extends Strategy {
Expand Down