Skip to content

Commit 43a59b9

Browse files
committed
Separate max bucket ratio for SMJ and SHJ and add OOM related documentation
1 parent a18f33f commit 43a59b9

File tree

4 files changed

+39
-19
lines changed

4 files changed

+39
-19
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2656,8 +2656,8 @@ object SQLConf {
26562656
.booleanConf
26572657
.createWithDefault(false)
26582658

2659-
val COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO =
2660-
buildConf("spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio")
2659+
val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO =
2660+
buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxBucketRatio")
26612661
.doc("The ratio of the number of two buckets being coalesced should be less than or " +
26622662
"equal to this value for bucket coalescing to be applied. This configuration only " +
26632663
s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true.")
@@ -2666,6 +2666,18 @@ object SQLConf {
26662666
.checkValue(_ > 0, "The difference must be positive.")
26672667
.createWithDefault(4)
26682668

2669+
val COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO =
2670+
buildConf("spark.sql.bucketing.coalesceBucketsInShuffledHashJoin.maxBucketRatio")
2671+
.doc("The ratio of the number of two buckets being coalesced should be less than or " +
2672+
"equal to this value for bucket coalescing to be applied. This configuration only " +
2673+
s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true. " +
2674+
"Note as coalescing reduces parallelism, there might be a higher risk for " +
2675+
"out of memory error at shuffled hash join build side.")
2676+
.version("3.1.0")
2677+
.intConf
2678+
.checkValue(_ > 0, "The difference must be positive.")
2679+
.createWithDefault(4)
2680+
26692681
/**
26702682
* Holds information about keys that have been deprecated.
26712683
*
@@ -3263,8 +3275,11 @@ class SQLConf extends Serializable with Logging {
32633275

32643276
def coalesceBucketsInJoinEnabled: Boolean = getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED)
32653277

3266-
def coalesceBucketsInJoinMaxBucketRatio: Int =
3267-
getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO)
3278+
def coalesceBucketsInSortMergeJoinMaxBucketRatio: Int =
3279+
getConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO)
3280+
3281+
def coalesceBucketsInShuffledHashJoinMaxBucketRatio: Int =
3282+
getConf(SQLConf.COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO)
32683283

32693284
/** ********************** SQLConf functionality methods ************ */
32703285

sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ import org.apache.spark.sql.internal.SQLConf
3636
* - The larger bucket number is divisible by the smaller bucket number.
3737
* - COALESCE_BUCKETS_IN_JOIN_ENABLED is set to true.
3838
* - The ratio of the number of buckets is less than the value set in
39-
* COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.
39+
* COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO (`SortMergeJoin`) or,
40+
* COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO (`ShuffledHashJoin`).
4041
*/
4142
case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] {
4243
private def updateNumCoalescedBuckets(
@@ -83,17 +84,19 @@ case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] {
8384
}
8485

8586
plan transform {
86-
case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets)
87-
if math.max(numLeftBuckets, numRightBuckets) / math.min(numLeftBuckets, numRightBuckets) <=
88-
conf.coalesceBucketsInJoinMaxBucketRatio =>
87+
case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets) =>
88+
val bucketRatio = math.max(numLeftBuckets, numRightBuckets) /
89+
math.min(numLeftBuckets, numRightBuckets)
8990
val numCoalescedBuckets = math.min(numLeftBuckets, numRightBuckets)
9091
join match {
91-
case j: SortMergeJoinExec =>
92+
case j: SortMergeJoinExec
93+
if bucketRatio <= conf.coalesceBucketsInSortMergeJoinMaxBucketRatio =>
9294
updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
9395
case j: ShuffledHashJoinExec
9496
// Only coalesce the buckets for shuffled hash join stream side,
9597
// to avoid OOM for build side.
96-
if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) =>
98+
if bucketRatio <= conf.coalesceBucketsInShuffledHashJoinMaxBucketRatio &&
99+
isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) =>
97100
updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
98101
case other => other
99102
}

sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -167,14 +167,16 @@ class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession {
167167
}
168168

169169
test("the ratio of the number of buckets is greater than max allowed") {
170-
withSQLConf(
171-
SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true",
172-
SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") {
173-
run(JoinSetting(
174-
RelationSetting(4, None), RelationSetting(16, None), joinOperator = sortMergeJoin))
175-
run(JoinSetting(
176-
RelationSetting(4, None), RelationSetting(16, None), joinOperator = shuffledHashJoin,
177-
shjBuildSide = Some(BuildLeft)))
170+
withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") {
171+
withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO.key -> "2") {
172+
run(JoinSetting(
173+
RelationSetting(4, None), RelationSetting(16, None), joinOperator = sortMergeJoin))
174+
}
175+
withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO.key -> "2") {
176+
run(JoinSetting(
177+
RelationSetting(4, None), RelationSetting(16, None), joinOperator = shuffledHashJoin,
178+
shjBuildSide = Some(BuildLeft)))
179+
}
178180
}
179181
}
180182

sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -922,7 +922,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
922922

923923
withSQLConf(
924924
SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true",
925-
SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") {
925+
SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO.key -> "2") {
926926
// Coalescing buckets is not applied because the ratio of the number of buckets (3)
927927
// is greater than max allowed (2).
928928
run(

0 commit comments

Comments
 (0)