[SPARK-31869][SQL] BroadcastHashJoinExec can utilize the build side for its output partitioning#28676
[SPARK-31869][SQL] BroadcastHashJoinExec can utilize the build side for its output partitioning#28676imback82 wants to merge 19 commits intoapache:masterfrom
Conversation
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
Outdated
Show resolved
Hide resolved
|
Test build #123310 has finished for PR 28676 at commit
|
|
retest this please |
|
Test build #123313 has finished for PR 28676 at commit
|
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Outdated
Show resolved
Hide resolved
|
Test build #123326 has finished for PR 28676 at commit
|
|
retest this please |
|
Test build #123334 has finished for PR 28676 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
Outdated
Show resolved
Hide resolved
|
Test build #124528 has finished for PR 28676 at commit
|
|
retest this please |
|
Test build #124561 has finished for PR 28676 at commit
|
|
retest this please |
|
Test build #124565 has finished for PR 28676 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
Outdated
Show resolved
Hide resolved
|
Test build #124672 has started for PR 28676 at commit |
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
Outdated
Show resolved
Hide resolved
| generateExprCombinations(current.tail, accumulated :+ current.head) ++ | ||
| buildKeys.map { bKeys => | ||
| bKeys.flatMap { bKey => | ||
| if (currentNumCombinations < maxNumCombinations) { |
There was a problem hiding this comment.
do we need this if? I think generateExprCombinations will return Nil if hitting the upper bound.
There was a problem hiding this comment.
Wanted to avoid unnecessary recursion (+ not creating new Seq, etc.), but I removed the check for simplicity.
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
Outdated
Show resolved
Hide resolved
|
Test build #125947 has finished for PR 28676 at commit
|
|
Test build #126004 has finished for PR 28676 at commit
|
viirya
left a comment
There was a problem hiding this comment.
Can you also mention the config in the description?
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
|
Test build #126067 has finished for PR 28676 at commit
|
|
Test build #126068 has finished for PR 28676 at commit
|
|
Test build #126075 has finished for PR 28676 at commit
|
|
retest this please |
|
Test build #126087 has finished for PR 28676 at commit
|
|
retest this please |
|
Test build #126091 has finished for PR 28676 at commit
|
|
retest this please |
|
Test build #126097 has finished for PR 28676 at commit
|
|
thanks, merging to master! |
What changes were proposed in this pull request?
Currently, the
BroadcastHashJoinExec'soutputPartitioningonly uses the streamed side'soutputPartitioning. However, if the join type ofBroadcastHashJoinExecis an inner-like join, the build side's info (the join keys) can be added toBroadcastHashJoinExec'soutputPartitioning.For example,
You see that
Exchange hashpartitioning(i2#103, 200)is introduced because there is no output partitioning info from the build side.This PR proposes to introduce output partitioning for the build side for
BroadcastHashJoinExecif the streamed side has aHashPartitioningor a collection ofHashPartitionings.There is a new internal config
spark.sql.execution.broadcastHashJoin.outputPartitioningExpandLimit, which can limit the number of partitioning aHashPartitioningcan expand to. It can be set to "0" to disable this feature.Why are the changes needed?
To remove unnecessary shuffle.
Does this PR introduce any user-facing change?
Yes, now the shuffle in the above example can be eliminated:
How was this patch tested?
Added new tests.