Skip to content

Commit

Permalink
[SPARK-35984][SQL][TEST] Config to force applying shuffled hash join
Browse files Browse the repository at this point in the history
Add a config `spark.sql.join.forceApplyShuffledHashJoin` to force applying shuffled hash join
during the join selection.

In the `SQLQueryTestSuite`, we want to cover 3 kinds of join (BHJ, SHJ, SMJ) in join.sql. But even
if the `spark.sql.join.preferSortMergeJoin` is set to `false`, shuffled hash join is still not guaranteed.
Thus, we need another config to force the selection.

No, only for testing

newly added tests
Verified all queries in join.sql will use `ShuffledHashJoin` when the config set to `true`

Closes #33182 from linhongliu-db/SPARK-35984-hash-join-config.

Authored-by: Linhong Liu <linhong.liu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
linhongliu-db authored and wakun committed Aug 28, 2022
1 parent 961174e commit bc1dd6d
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ object BroadcastJoinOuterJoinStreamSide extends Rule[LogicalPlan] with JoinSelec
LeftOuter | LeftSemi | LeftAnti, _, _) =>
j
case j @ ExtractEquiJoinKeys(LeftOuter | LeftSemi | LeftAnti,
leftKeys, _, None, left, right, hint) if leftKeys.nonEmpty && muchSmaller(left, right) &&
leftKeys, _, None, left, right, hint)
if leftKeys.nonEmpty && muchSmaller(left, right, conf) &&
!(hintToBroadcastRight(hint) || canBroadcastBySize(right, conf)) &&
(hintToBroadcastLeft(hint) || canBroadcastBySize(left, conf)) =>
logInfo("BroadcastJoinOuterJoinStreamSide detected.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils

/**
* Reorder the joins and push all the conditions into join, so that the bottom ones have at least
Expand Down Expand Up @@ -270,28 +271,18 @@ trait JoinSelectionHelper {
val buildLeft = if (hintOnly) {
hintToShuffleHashJoinLeft(hint)
} else {
if (hintToPreferShuffleHashJoinLeft(hint)) {
true
} else {
if (!conf.preferSortMergeJoin) {
canBuildLocalHashMapBySize(left, conf) && muchSmaller(left, right)
} else {
false
}
}
hintToPreferShuffleHashJoinLeft(hint) ||
(!conf.preferSortMergeJoin && canBuildLocalHashMapBySize(left, conf) &&
muchSmaller(left, right)) ||
forceApplyShuffledHashJoin(conf)
}
val buildRight = if (hintOnly) {
hintToShuffleHashJoinRight(hint)
} else {
if (hintToPreferShuffleHashJoinRight(hint)) {
true
} else {
if (!conf.preferSortMergeJoin) {
canBuildLocalHashMapBySize(right, conf) && muchSmaller(right, left)
} else {
false
}
}
hintToPreferShuffleHashJoinRight(hint) ||
(!conf.preferSortMergeJoin && canBuildLocalHashMapBySize(right, conf) &&
muchSmaller(right, left)) ||
forceApplyShuffledHashJoin(conf)
}
getBuildSide(
canBuildShuffledHashJoinLeft(joinType) && buildLeft,
Expand Down Expand Up @@ -435,8 +426,8 @@ trait JoinSelectionHelper {
* that is much smaller than other one. Since we does not have the statistic for number of rows,
* use the size of bytes here as estimation.
*/
def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes
def muchSmaller(a: LogicalPlan, b: LogicalPlan, conf: SQLConf): Boolean = {
a.stats.sizeInBytes * conf.getConf(SQLConf.SHUFFLE_HASH_JOIN_FACTOR) <= b.stats.sizeInBytes
}

def canBroadcastTokenTree(left: LogicalPlan,
Expand All @@ -460,5 +451,14 @@ trait JoinSelectionHelper {
right.stats.sizeInBytes <= conf.containsJoinThreshold &&
!hintToNotBroadcastRight(hint)
}

/**
* Returns whether a shuffled hash join should be force applied.
* The config key is hard-coded because it's testing only and should not be exposed.
*/
private def forceApplyShuffledHashJoin(conf: SQLConf): Boolean = {
Utils.isTesting &&
conf.getConfString("spark.sql.join.forceApplyShuffledHashJoin", "false") == "true"
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

--CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=10485760
--CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=-1,spark.sql.join.preferSortMergeJoin=true
--CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=-1,spark.sql.join.preferSortMergeJoin=false
--CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=-1,spark.sql.join.forceApplyShuffledHashJoin=true

--CONFIG_DIM2 spark.sql.codegen.wholeStage=true
--CONFIG_DIM2 spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=CODEGEN_ONLY
Expand Down
8 changes: 8 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1603,4 +1603,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
}
}
}

test("SPARK-35984: Config to force applying shuffled hash join") {
val sql = "SELECT * FROM testData JOIN testData2 ON key = a"
assertJoin(sql, classOf[SortMergeJoinExec])
withSQLConf("spark.sql.join.forceApplyShuffledHashJoin" -> "true") {
assertJoin(sql, classOf[ShuffledHashJoinExec])
}
}
}

0 comments on commit bc1dd6d

Please sign in to comment.