Skip to content

Commit

Permalink
uncomment and bugfix
Browse files Browse the repository at this point in the history
  • Loading branch information
xiacongling committed May 27, 2021
1 parent 070dd1c commit 9850eb1
Showing 1 changed file with 18 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{PredicateHelper, RowOrdering}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide}
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{JoinHint, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, JoinHint, LogicalPlan}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.{SparkSession, Strategy}

Expand Down Expand Up @@ -154,17 +154,16 @@ case class KylinJoinSelection(session: SparkSession) extends Strategy with Predi

private def canBroadcastByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan, hint: JoinHint)
: Boolean = {
// val buildLeft = canBuildLeft(joinType) && logical.BROADCAST.equals(hint.leftHint.get.strategy.get)
// val buildRight = canBuildRight(joinType) && logical.BROADCAST.equals(hint.rightHint.get.strategy.get)
// buildLeft || buildRight
false
val buildLeft = canBuildLeft(joinType) && hintBroadcast(hint.leftHint)
val buildRight = canBuildRight(joinType) && hintBroadcast(hint.rightHint)
buildLeft || buildRight
}

private def broadcastSideByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan, hint: JoinHint)
: BuildSide = {
// val buildLeft = canBuildLeft(joinType) && logical.BROADCAST.equals(hint.leftHint.get.strategy.get)
// val buildRight = canBuildRight(joinType) && logical.BROADCAST.equals(hint.rightHint.get.strategy.get)
broadcastSide(false, false, left, right)
val buildLeft = canBuildLeft(joinType) && hintBroadcast(hint.leftHint)
val buildRight = canBuildRight(joinType) && hintBroadcast(hint.rightHint)
broadcastSide(buildLeft, buildRight, left, right)
}

private def canBroadcastBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan)
Expand Down Expand Up @@ -193,22 +192,22 @@ case class KylinJoinSelection(session: SparkSession) extends Strategy with Predi
leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))

// broadcast hints were not specified, so need to infer it from size and configuration.
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right, hint)
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right, _)
if canBroadcastBySizes(joinType, left, right) =>
val buildSide = broadcastSideBySizes(joinType, left, right)
Seq(joins.BroadcastHashJoinExec(
leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))

// --- ShuffledHashJoin ---------------------------------------------------------------------

case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right, hint)
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right, _)
if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right)
&& muchSmaller(right, left) ||
!RowOrdering.isOrderable(leftKeys) =>
Seq(joins.ShuffledHashJoinExec(
leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))

case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right, hint)
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right, _)
if !conf.preferSortMergeJoin && canBuildLeft(joinType) && canBuildLocalHashMap(left)
&& muchSmaller(left, right) ||
!RowOrdering.isOrderable(leftKeys) =>
Expand All @@ -217,7 +216,7 @@ case class KylinJoinSelection(session: SparkSession) extends Strategy with Predi

// --- SortMergeJoin ------------------------------------------------------------

case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right, hint)
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right, _)
if RowOrdering.isOrderable(leftKeys) =>
joins.SortMergeJoinExec(
leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
Expand All @@ -231,19 +230,18 @@ case class KylinJoinSelection(session: SparkSession) extends Strategy with Predi
joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil

case j@logical.Join(left, right, joinType, condition, hint)
case j@logical.Join(left, right, joinType, condition, _)
if canBroadcastBySizes(joinType, left, right) =>
val buildSide = broadcastSideBySizes(joinType, left, right)
joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil

// Pick CartesianProduct for InnerJoin
case logical.Join(left, right, _: InnerLike, condition, hint) =>
case logical.Join(left, right, _: InnerLike, condition, _) =>
joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil

case logical.Join(left, right, joinType, condition, hint) =>
val buildSide = broadcastSide(
false, false, left, right)
val buildSide = broadcastSide(hintBroadcast(hint.leftHint), hintBroadcast(hint.rightHint), left, right)
// This join could be very slow or OOM
joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
Expand All @@ -252,6 +250,10 @@ case class KylinJoinSelection(session: SparkSession) extends Strategy with Predi

case _ => Nil
}

private def hintBroadcast(hintInfo: Option[HintInfo]): Boolean = {
hintInfo.map(_.strategy).exists(_.contains(BROADCAST))
}
}

object JoinMemoryManager extends Logging {
Expand Down

0 comments on commit 9850eb1

Please sign in to comment.