Skip to content

Commit 2c4925b

Browse files
committed
Address comments
1 parent 7481e36 commit 2c4925b

File tree

3 files changed

+21
-23
lines changed

3 files changed

+21
-23
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2660,13 +2660,18 @@ object SQLConf {
26602660
}
26612661

26622662
val BUCKET_READ_STRATEGY_IN_JOIN =
2663-
buildConf("spark.sql.bucketing.bucketReadStrategyInJoin")
2664-
.doc("When set to COALESCE, if two bucketed tables with the different number of buckets " +
2665-
"are joined, the side with a bigger number of buckets will be coalesced to have the same " +
2666-
"number of buckets as the other side. When set to REPARTITION, the side with a bigger " +
2667-
"number of buckets will be repartitioned to have the same number of buckets as the other " +
2668-
"side. The bigger number of buckets must be divisible by the smaller number of buckets, " +
2669-
"and the strategy is applied to sort-merge joins and shuffled hash joins. " +
2663+
buildConf("spark.sql.sources.bucketing.readStrategyInJoin")
2664+
.doc("The bucket read strategy can be set to one of " +
2665+
BucketReadStrategyInJoin.values.mkString(", ") +
2666+
s". When set to ${BucketReadStrategyInJoin.COALESCE}, if two bucketed tables with " +
2667+
"different number of buckets are joined, the side with a bigger number of buckets will " +
2668+
"be coalesced to have the same number of buckets as the smaller side. When set to " +
2669+
s"${BucketReadStrategyInJoin.REPARTITION}, the side with a smaller number of buckets " +
2670+
"will be repartitioned to have the same number of buckets as the bigger side. For either " +
2671+
"coalescing or repartitioning to be applied, The bigger number of buckets must be " +
2672+
"divisible by the smaller number of buckets, and the strategy is applied to sort-merge " +
2673+
s"joins and shuffled hash joins. By default, the read strategy is set to " +
2674+
s"${BucketReadStrategyInJoin.OFF}, and neither coalescing nor reparitioning is applied. " +
26702675
"Note: Coalescing bucketed table can avoid unnecessary shuffle in join, but it also " +
26712676
"reduces parallelism and could possibly cause OOM for shuffled hash join. Repartitioning " +
26722677
"bucketed table avoids unnecessary shuffle in join while maintaining the parallelism " +
@@ -2678,7 +2683,7 @@ object SQLConf {
26782683
.createWithDefault(BucketReadStrategyInJoin.OFF.toString)
26792684

26802685
val BUCKET_READ_STRATEGY_IN_JOIN_MAX_BUCKET_RATIO =
2681-
buildConf("spark.sql.bucketing.bucketReadStrategyInJoin.maxBucketRatio")
2686+
buildConf("spark.sql.sources.bucketing.readStrategyInJoin.maxBucketRatio")
26822687
.doc("The ratio of the number of two buckets being coalesced/repartitioned should be " +
26832688
"less than or equal to this value for bucket coalescing/repartitioning to be applied. " +
26842689
s"This configuration only has an effect when '${BUCKET_READ_STRATEGY_IN_JOIN.key}' " +

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,7 @@ case class FileSourceScanExec(
180180
}
181181

182182
@transient private lazy val isRepartitioningBuckets: Boolean = {
183-
relation.bucketSpec.isDefined &&
184-
optionalNewNumBuckets.isDefined &&
183+
bucketedScan && optionalNewNumBuckets.isDefined &&
185184
optionalNewNumBuckets.get > relation.bucketSpec.get.numBuckets
186185
}
187186

@@ -593,7 +592,11 @@ case class FileSourceScanExec(
593592
driverMetrics("numFiles") = filesNum
594593
driverMetrics("filesSize") = filesSize
595594
new BucketRepartitioningRDD(
596-
fsRelation.sparkSession, readFile, filePartitions, bucketSpec, newNumBuckets, output)
595+
fsRelation.sparkSession,
596+
readFile,
597+
filePartitions,
598+
outputPartitioning.asInstanceOf[HashPartitioning].partitionIdExpression,
599+
output)
597600
}
598601
}
599602
}

sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/BucketRepartitioningRDD.scala

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@ import scala.collection.JavaConverters._
2222
import org.apache.spark.{Partition, TaskContext}
2323
import org.apache.spark.sql.SparkSession
2424
import org.apache.spark.sql.catalyst.InternalRow
25-
import org.apache.spark.sql.catalyst.catalog.BucketSpec
26-
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
27-
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
25+
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, UnsafeProjection}
2826
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}
2927
import org.apache.spark.sql.vectorized.ColumnarBatch
3028

@@ -35,12 +33,9 @@ private[spark] class BucketRepartitioningRDD(
3533
@transient private val sparkSession: SparkSession,
3634
readFunction: PartitionedFile => Iterator[InternalRow],
3735
@transient override val filePartitions: Seq[FilePartition],
38-
bucketSpec: BucketSpec,
39-
numRepartitionedBuckets: Int,
36+
bucketIdExpression: Expression,
4037
output: Seq[Attribute])
4138
extends FileScanRDD(sparkSession, readFunction, filePartitions) {
42-
assert(numRepartitionedBuckets > bucketSpec.numBuckets)
43-
assert(numRepartitionedBuckets % bucketSpec.numBuckets == 0)
4439

4540
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
4641
val iter: Iterator[_] = super.compute(split, context)
@@ -51,11 +46,6 @@ private[spark] class BucketRepartitioningRDD(
5146
}
5247

5348
private lazy val getBucketId: InternalRow => Int = {
54-
val bucketIdExpression = {
55-
val bucketColumns = bucketSpec.bucketColumnNames.map(c => output.find(_.name == c).get)
56-
HashPartitioning(bucketColumns, numRepartitionedBuckets).partitionIdExpression
57-
}
58-
5949
val projection = UnsafeProjection.create(Seq(bucketIdExpression), output)
6050
row => projection(row).getInt(0)
6151
}

0 commit comments

Comments
 (0)