Skip to content

Commit 64a95d1

Browse files
committed
Coalesce bucketed table for shuffled hash join if applicable
1 parent 004aea8 commit 64a95d1

File tree

7 files changed

+287
-177
lines changed

7 files changed

+287
-177
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2645,21 +2645,22 @@ object SQLConf {
26452645
.booleanConf
26462646
.createWithDefault(true)
26472647

2648-
val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED =
2649-
buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.enabled")
2648+
val COALESCE_BUCKETS_IN_JOIN_ENABLED =
2649+
buildConf("spark.sql.bucketing.coalesceBucketsInJoin.enabled")
26502650
.doc("When true, if two bucketed tables with the different number of buckets are joined, " +
26512651
"the side with a bigger number of buckets will be coalesced to have the same number " +
2652-
"of buckets as the other side. Bucket coalescing is applied only to sort-merge joins " +
2653-
"and only when the bigger number of buckets is divisible by the smaller number of buckets.")
2652+
"of buckets as the other side. Bigger number of buckets is divisible by the smaller " +
2653+
"number of buckets. Bucket coalescing is applied to sort-merge joins and " +
2654+
"shuffled hash join.")
26542655
.version("3.1.0")
26552656
.booleanConf
26562657
.createWithDefault(false)
26572658

2658-
val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO =
2659-
buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxBucketRatio")
2659+
val COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO =
2660+
buildConf("spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio")
26602661
.doc("The ratio of the number of two buckets being coalesced should be less than or " +
26612662
"equal to this value for bucket coalescing to be applied. This configuration only " +
2662-
s"has an effect when '${COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key}' is set to true.")
2663+
s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true.")
26632664
.version("3.1.0")
26642665
.intConf
26652666
.checkValue(_ > 0, "The difference must be positive.")
@@ -3260,6 +3261,11 @@ class SQLConf extends Serializable with Logging {
32603261
def legacyAllowCastNumericToTimestamp: Boolean =
32613262
getConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP)
32623263

3264+
def coalesceBucketsInJoinEnabled: Boolean = getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED)
3265+
3266+
def coalesceBucketsInJoinMaxBucketRatio: Int =
3267+
getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO)
3268+
32633269
/** ********************** SQLConf functionality methods ************ */
32643270

32653271
/** Set Spark SQL configuration properties. */

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
3434
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
3535
import org.apache.spark.sql.catalyst.util.truncatedString
3636
import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan}
37-
import org.apache.spark.sql.execution.bucketing.CoalesceBucketsInSortMergeJoin
37+
import org.apache.spark.sql.execution.bucketing.CoalesceBucketsInJoin
3838
import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters
3939
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
4040
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata}
@@ -332,7 +332,7 @@ object QueryExecution {
332332
// as the original plan is hidden behind `AdaptiveSparkPlanExec`.
333333
adaptiveExecutionRule.toSeq ++
334334
Seq(
335-
CoalesceBucketsInSortMergeJoin(sparkSession.sessionState.conf),
335+
CoalesceBucketsInJoin(sparkSession.sessionState.conf),
336336
PlanDynamicPruningFilters(sparkSession),
337337
PlanSubqueries(sparkSession),
338338
EnsureRequirements(sparkSession.sessionState.conf),
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.bucketing
19+
20+
import scala.annotation.tailrec
21+
22+
import org.apache.spark.sql.catalyst.catalog.BucketSpec
23+
import org.apache.spark.sql.catalyst.expressions.Expression
24+
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
25+
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
26+
import org.apache.spark.sql.catalyst.rules.Rule
27+
import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SparkPlan}
28+
import org.apache.spark.sql.execution.joins.{BaseJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
29+
import org.apache.spark.sql.internal.SQLConf
30+
31+
/**
32+
* This rule coalesces one side of the `SortMergeJoin` and `ShuffledHashJoin`
33+
* if the following conditions are met:
34+
* - Two bucketed tables are joined.
35+
* - Join keys match with output partition expressions on their respective sides.
36+
* - The larger bucket number is divisible by the smaller bucket number.
37+
* - COALESCE_BUCKETS_IN_JOIN_ENABLED is set to true.
38+
* - The ratio of the number of buckets is less than the value set in
39+
* COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.
40+
*/
41+
case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] {
42+
private def updateNumCoalescedBuckets(
43+
join: BaseJoinExec,
44+
numLeftBuckets: Int,
45+
numRightBucket: Int,
46+
numCoalescedBuckets: Int): BaseJoinExec = {
47+
if (numCoalescedBuckets != numLeftBuckets) {
48+
val leftCoalescedChild = join.left transformUp {
49+
case f: FileSourceScanExec =>
50+
f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
51+
}
52+
join match {
53+
case j: SortMergeJoinExec => j.copy(left = leftCoalescedChild)
54+
case j: ShuffledHashJoinExec => j.copy(left = leftCoalescedChild)
55+
}
56+
} else {
57+
val rightCoalescedChild = join.right transformUp {
58+
case f: FileSourceScanExec =>
59+
f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
60+
}
61+
join match {
62+
case j: SortMergeJoinExec => j.copy(right = rightCoalescedChild)
63+
case j: ShuffledHashJoinExec => j.copy(right = rightCoalescedChild)
64+
}
65+
}
66+
}
67+
68+
private def isCoalesceSHJStreamSide(
69+
join: ShuffledHashJoinExec,
70+
numLeftBuckets: Int,
71+
numRightBucket: Int,
72+
numCoalescedBuckets: Int): Boolean = {
73+
if (numCoalescedBuckets == numLeftBuckets) {
74+
join.buildSide != BuildRight
75+
} else {
76+
join.buildSide != BuildLeft
77+
}
78+
}
79+
80+
def apply(plan: SparkPlan): SparkPlan = {
81+
if (!conf.coalesceBucketsInJoinEnabled) {
82+
return plan
83+
}
84+
85+
plan transform {
86+
case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets)
87+
if math.max(numLeftBuckets, numRightBuckets) / math.min(numLeftBuckets, numRightBuckets) <=
88+
conf.coalesceBucketsInJoinMaxBucketRatio =>
89+
val numCoalescedBuckets = math.min(numLeftBuckets, numRightBuckets)
90+
join match {
91+
case j: SortMergeJoinExec =>
92+
updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
93+
case j: ShuffledHashJoinExec
94+
// Only coalesce the buckets for shuffled hash join stream side,
95+
// to avoid OOM for build side.
96+
if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) =>
97+
updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
98+
case other => other
99+
}
100+
case other => other
101+
}
102+
}
103+
}
104+
105+
/**
106+
* An extractor that extracts `SortMergeJoinExec` and `ShuffledHashJoin`,
107+
* where both sides of the join have the bucketed tables,
108+
* are consisted of only the scan operation,
109+
* and numbers of buckets are not equal but divisible.
110+
*/
111+
object ExtractJoinWithBuckets {
112+
@tailrec
113+
private def isScanOperation(plan: SparkPlan): Boolean = plan match {
114+
case f: FilterExec => isScanOperation(f.child)
115+
case p: ProjectExec => isScanOperation(p.child)
116+
case _: FileSourceScanExec => true
117+
case _ => false
118+
}
119+
120+
private def getBucketSpec(plan: SparkPlan): Option[BucketSpec] = {
121+
plan.collectFirst {
122+
case f: FileSourceScanExec if f.relation.bucketSpec.nonEmpty &&
123+
f.optionalNumCoalescedBuckets.isEmpty =>
124+
f.relation.bucketSpec.get
125+
}
126+
}
127+
128+
/**
129+
* The join keys should match with expressions for output partitioning. Note that
130+
* the ordering does not matter because it will be handled in `EnsureRequirements`.
131+
*/
132+
private def satisfiesOutputPartitioning(
133+
keys: Seq[Expression],
134+
partitioning: Partitioning): Boolean = {
135+
partitioning match {
136+
case HashPartitioning(exprs, _) if exprs.length == keys.length =>
137+
exprs.forall(e => keys.exists(_.semanticEquals(e)))
138+
case _ => false
139+
}
140+
}
141+
142+
private def isApplicable(j: BaseJoinExec): Boolean = {
143+
(j.isInstanceOf[SortMergeJoinExec] ||
144+
j.isInstanceOf[ShuffledHashJoinExec]) &&
145+
isScanOperation(j.left) &&
146+
isScanOperation(j.right) &&
147+
satisfiesOutputPartitioning(j.leftKeys, j.left.outputPartitioning) &&
148+
satisfiesOutputPartitioning(j.rightKeys, j.right.outputPartitioning)
149+
}
150+
151+
private def isDivisible(numBuckets1: Int, numBuckets2: Int): Boolean = {
152+
val (small, large) = (math.min(numBuckets1, numBuckets2), math.max(numBuckets1, numBuckets2))
153+
// A bucket can be coalesced only if the bigger number of buckets is divisible by the smaller
154+
// number of buckets because bucket id is calculated by modding the total number of buckets.
155+
numBuckets1 != numBuckets2 && large % small == 0
156+
}
157+
158+
def unapply(plan: SparkPlan): Option[(BaseJoinExec, Int, Int)] = {
159+
plan match {
160+
case s: BaseJoinExec if isApplicable(s) =>
161+
val leftBucket = getBucketSpec(s.left)
162+
val rightBucket = getBucketSpec(s.right)
163+
if (leftBucket.isDefined && rightBucket.isDefined &&
164+
isDivisible(leftBucket.get.numBuckets, rightBucket.get.numBuckets)) {
165+
Some(s, leftBucket.get.numBuckets, rightBucket.get.numBuckets)
166+
} else {
167+
None
168+
}
169+
case _ => None
170+
}
171+
}
172+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoin.scala

Lines changed: 0 additions & 132 deletions
This file was deleted.

sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
347347
test("Coalesced bucket info should be a part of explain string") {
348348
withTable("t1", "t2") {
349349
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0",
350-
SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") {
350+
SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") {
351351
Seq(1, 2).toDF("i").write.bucketBy(8, "i").saveAsTable("t1")
352352
Seq(2, 3).toDF("i").write.bucketBy(4, "i").saveAsTable("t2")
353353
val df1 = spark.table("t1")

0 commit comments

Comments
 (0)