Skip to content

[SPARK-32656][SQL] Repartition bucketed tables for sort merge join / shuffled hash join if applicable #29473

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2655,24 +2655,39 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val COALESCE_BUCKETS_IN_JOIN_ENABLED =
buildConf("spark.sql.bucketing.coalesceBucketsInJoin.enabled")
.doc("When true, if two bucketed tables with the different number of buckets are joined, " +
"the side with a bigger number of buckets will be coalesced to have the same number " +
"of buckets as the other side. Bigger number of buckets is divisible by the smaller " +
"number of buckets. Bucket coalescing is applied to sort-merge joins and " +
"shuffled hash join. Note: Coalescing bucketed table can avoid unnecessary shuffling " +
"in join, but it also reduces parallelism and could possibly cause OOM for " +
"shuffled hash join.")
.version("3.1.0")
.booleanConf
.createWithDefault(false)
object BucketReadStrategyInJoin extends Enumeration {
val COALESCE, REPARTITION, OFF = Value
}

val COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO =
buildConf("spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio")
.doc("The ratio of the number of two buckets being coalesced should be less than or " +
"equal to this value for bucket coalescing to be applied. This configuration only " +
s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true.")
val BUCKET_READ_STRATEGY_IN_JOIN =
buildConf("spark.sql.sources.bucketing.readStrategyInJoin")
.doc("The bucket read strategy can be set to one of " +
BucketReadStrategyInJoin.values.mkString(", ") +
s". When set to ${BucketReadStrategyInJoin.COALESCE}, if two bucketed tables with " +
"different number of buckets are joined, the side with a bigger number of buckets will " +
"be coalesced to have the same number of buckets as the smaller side. When set to " +
s"${BucketReadStrategyInJoin.REPARTITION}, the side with a smaller number of buckets " +
"will be repartitioned to have the same number of buckets as the bigger side. For either " +
"coalescing or repartitioning to be applied, The bigger number of buckets must be " +
"divisible by the smaller number of buckets, and the strategy is applied to sort-merge " +
s"joins and shuffled hash joins. By default, the read strategy is set to " +
s"${BucketReadStrategyInJoin.OFF}, and neither coalescing nor reparitioning is applied. " +
"Note: Coalescing bucketed table can avoid unnecessary shuffle in join, but it also " +
"reduces parallelism and could possibly cause OOM for shuffled hash join. Repartitioning " +
"bucketed table avoids unnecessary shuffle in join while maintaining the parallelism " +
"at the cost of reading duplicate data.")
.version("3.1.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(BucketReadStrategyInJoin.values.map(_.toString))
.createWithDefault(BucketReadStrategyInJoin.OFF.toString)

val BUCKET_READ_STRATEGY_IN_JOIN_MAX_BUCKET_RATIO =
buildConf("spark.sql.sources.bucketing.readStrategyInJoin.maxBucketRatio")
.doc("The ratio of the number of two buckets being coalesced/repartitioned should be " +
"less than or equal to this value for bucket coalescing/repartitioning to be applied. " +
s"This configuration only has an effect when '${BUCKET_READ_STRATEGY_IN_JOIN.key}' " +
s"is set to a strategy other than '${BucketReadStrategyInJoin.OFF}'.")
.version("3.1.0")
.intConf
.checkValue(_ > 0, "The difference must be positive.")
Expand Down Expand Up @@ -3325,16 +3340,17 @@ class SQLConf extends Serializable with Logging {

def metadataCacheTTL: Long = getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS)

def coalesceBucketsInJoinEnabled: Boolean = getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED)

def coalesceBucketsInJoinMaxBucketRatio: Int =
getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO)
def bucketReadStrategyInJoinMaxBucketRatio: Int =
getConf(SQLConf.BUCKET_READ_STRATEGY_IN_JOIN_MAX_BUCKET_RATIO)

def optimizeNullAwareAntiJoin: Boolean =
getConf(SQLConf.OPTIMIZE_NULL_AWARE_ANTI_JOIN)

def legacyPathOptionBehavior: Boolean = getConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR)

def bucketReadStrategyInJoin: BucketReadStrategyInJoin.Value =
BucketReadStrategyInJoin.withName(getConf(BUCKET_READ_STRATEGY_IN_JOIN))

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.bucketing.BucketRepartitioningRDD
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
Expand Down Expand Up @@ -154,7 +155,7 @@ case class RowDataSourceScanExec(
* @param requiredSchema Required schema of the underlying relation, excluding partition columns.
* @param partitionFilters Predicates to use for partition pruning.
* @param optionalBucketSet Bucket ids for bucket pruning.
* @param optionalNumCoalescedBuckets Number of coalesced buckets.
* @param optionalNewNumBuckets Number of buckets to coalesce or repartition.
* @param dataFilters Filters on non-partition columns.
* @param tableIdentifier identifier for the table in the metastore.
*/
Expand All @@ -164,15 +165,23 @@ case class FileSourceScanExec(
requiredSchema: StructType,
partitionFilters: Seq[Expression],
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
optionalNewNumBuckets: Option[Int],
dataFilters: Seq[Expression],
tableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec {

// Note that some vals referring the file-based relation are lazy intentionally
// so that this plan can be canonicalized on executor side too. See SPARK-23731.
override lazy val supportsColumnar: Boolean = {
relation.fileFormat.supportBatch(relation.sparkSession, schema)
// `RepartitioningBucketRDD` converts columnar batches to rows to calculate bucket id for each
// row, thus columnar is not supported when `RepartitioningBucketRDD` is used to avoid
// conversions from batches to rows and back to batches.
relation.fileFormat.supportBatch(relation.sparkSession, schema) && !isRepartitioningBuckets
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about how much this columnar execution makes performance gains though, the proposed idea is to give up the gains then use bucket repartitioning instead?

Copy link
Contributor Author

@imback82 imback82 Aug 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the datasource will still be read as batches in this case (if whole stage codegen is enabled).

I see that physical plans operate on rows, so batches are converted to rows via ColumnarToRow anyway. So, I think perf impact would be minimal here; the difference could be the code-gen conversion from columnar to row vs. iterating batch.rowIterator() in BucketRepartitioningRDD.

}

@transient private lazy val isRepartitioningBuckets: Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we don't need : Boolean ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed the same style from override lazy val supportsColumnar: Boolean, etc. Is this still not needed?

bucketedScan && optionalNewNumBuckets.isDefined &&
optionalNewNumBuckets.get > relation.bucketSpec.get.numBuckets
}

private lazy val needsUnsafeRowConversion: Boolean = {
Expand Down Expand Up @@ -292,7 +301,7 @@ case class FileSourceScanExec(
// above
val spec = relation.bucketSpec.get
val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
val numPartitions = optionalNumCoalescedBuckets.getOrElse(spec.numBuckets)
val numPartitions = optionalNewNumBuckets.getOrElse(spec.numBuckets)
val partitioning = HashPartitioning(bucketColumns, numPartitions)
val sortColumns =
spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => x.isDefined).map(_.get)
Expand All @@ -314,7 +323,7 @@ case class FileSourceScanExec(
val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1)

// TODO SPARK-24528 Sort order is currently ignored if buckets are coalesced.
if (singleFilePartitions && optionalNumCoalescedBuckets.isEmpty) {
if (singleFilePartitions && (optionalNewNumBuckets.isEmpty || isRepartitioningBuckets)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need || isRepartitioningBuckets right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repartition can still maintain the sort order whereas coalescing cannot, thus this check is needed.

// TODO Currently Spark does not support writing columns sorting in descending order
// so using Ascending order. This can be fixed in future
sortColumns.map(attribute => SortOrder(attribute, Ascending))
Expand Down Expand Up @@ -360,7 +369,9 @@ case class FileSourceScanExec(
}
metadata + ("SelectedBucketsCount" ->
(s"$numSelectedBuckets out of ${spec.numBuckets}" +
optionalNumCoalescedBuckets.map { b => s" (Coalesced to $b)"}.getOrElse("")))
optionalNewNumBuckets.map { b =>
if (b > spec.numBuckets) s" (Repartitioned to $b)" else s" (Coalesced to $b)"
}.getOrElse("")))
} getOrElse {
metadata
}
Expand Down Expand Up @@ -548,22 +559,46 @@ case class FileSourceScanExec(
filesGroupedToBuckets
}

val filePartitions = optionalNumCoalescedBuckets.map { numCoalescedBuckets =>
logInfo(s"Coalescing to ${numCoalescedBuckets} buckets")
val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % numCoalescedBuckets)
Seq.tabulate(numCoalescedBuckets) { bucketId =>
val partitionedFiles = coalescedBuckets.get(bucketId).map {
_.values.flatten.toArray
}.getOrElse(Array.empty)
FilePartition(bucketId, partitionedFiles)
}
}.getOrElse {
Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
if (optionalNewNumBuckets.isEmpty) {
val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty))
}
new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
} else {
val newNumBuckets = optionalNewNumBuckets.get
if (newNumBuckets < bucketSpec.numBuckets) {
assert(bucketSpec.numBuckets % newNumBuckets == 0)
logInfo(s"Coalescing to $newNumBuckets buckets from ${bucketSpec.numBuckets} buckets")
val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % newNumBuckets)
val filePartitions = Seq.tabulate(newNumBuckets) { bucketId =>
val partitionedFiles = coalescedBuckets
.get(bucketId)
.map(_.values.flatten.toArray)
.getOrElse(Array.empty)
FilePartition(bucketId, partitionedFiles)
}
new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
} else {
assert(newNumBuckets % bucketSpec.numBuckets == 0)
logInfo(s"Repartitioning to $newNumBuckets buckets from ${bucketSpec.numBuckets} buckets")
val filePartitions = Seq.tabulate(newNumBuckets) { bucketId =>
FilePartition(
bucketId,
prunedFilesGroupedToBuckets.getOrElse(bucketId % bucketSpec.numBuckets, Array.empty))
}
// There are now more files to be read.
val filesNum = filePartitions.map(_.files.size.toLong).sum
val filesSize = filePartitions.map(_.files.map(_.length).sum).sum
driverMetrics("numFiles") = filesNum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

per setFilesNumAndSizeMetric, should we set staticFilesNum here or numFiles ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think staticFilesNum is used only for dynamic partition pruning:

/** SQL metrics generated only for scans using dynamic partition pruning. */
private lazy val staticMetrics = if (partitionFilters.filter(isDynamicPruningFilter).nonEmpty) {
Map("staticFilesNum" -> SQLMetrics.createMetric(sparkContext, "static number of files read"),
"staticFilesSize" -> SQLMetrics.createSizeMetric(sparkContext, "static size of files read"))

driverMetrics("filesSize") = filesSize
new BucketRepartitioningRDD(
fsRelation.sparkSession,
readFile,
filePartitions,
outputPartitioning.asInstanceOf[HashPartitioning].partitionIdExpression,
output)
}
}

new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
}

/**
Expand Down Expand Up @@ -622,7 +657,7 @@ case class FileSourceScanExec(
QueryPlan.normalizePredicates(
filterUnusedDynamicPruningExpressions(partitionFilters), output),
optionalBucketSet,
optionalNumCoalescedBuckets,
optionalNewNumBuckets,
QueryPlan.normalizePredicates(dataFilters, output),
None)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan}
import org.apache.spark.sql.execution.bucketing.CoalesceBucketsInJoin
import org.apache.spark.sql.execution.bucketing.CoalesceOrRepartitionBucketsInJoin
import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata}
Expand Down Expand Up @@ -339,7 +339,7 @@ object QueryExecution {
// as the original plan is hidden behind `AdaptiveSparkPlanExec`.
adaptiveExecutionRule.toSeq ++
Seq(
CoalesceBucketsInJoin(sparkSession.sessionState.conf),
CoalesceOrRepartitionBucketsInJoin(sparkSession.sessionState.conf),
PlanDynamicPruningFilters(sparkSession),
PlanSubqueries(sparkSession),
RemoveRedundantProjects(sparkSession.sessionState.conf),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.bucketing

import scala.collection.JavaConverters._

import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, UnsafeProjection}
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
* An RDD that filters out the rows that do not belong to the current bucket file being read.
*/
private[spark] class BucketRepartitioningRDD(
@transient private val sparkSession: SparkSession,
readFunction: PartitionedFile => Iterator[InternalRow],
@transient override val filePartitions: Seq[FilePartition],
bucketIdExpression: Expression,
output: Seq[Attribute])
extends FileScanRDD(sparkSession, readFunction, filePartitions) {

override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val iter: Iterator[_] = super.compute(split, context)
iter.flatMap {
case row: InternalRow => Seq(row)
case batch: ColumnarBatch => batch.rowIterator().asScala
}.filter(getBucketId(_) == split.index)
}

private lazy val getBucketId: InternalRow => Int = {
val projection = UnsafeProjection.create(Seq(bucketIdExpression), output)
row => projection(row).getInt(0)
}
}
Loading