-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-32656][SQL] Repartition bucketed tables for sort merge join / shuffled hash join if applicable #29473
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c2c7a59
5d8390c
404020b
bc5fcd2
21882ab
3871ef2
5665bc1
e2374ac
7481e36
2c4925b
366c9c3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions._ | |||||||||
import org.apache.spark.sql.catalyst.plans.QueryPlan | ||||||||||
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} | ||||||||||
import org.apache.spark.sql.catalyst.util.truncatedString | ||||||||||
import org.apache.spark.sql.execution.bucketing.BucketRepartitioningRDD | ||||||||||
import org.apache.spark.sql.execution.datasources._ | ||||||||||
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} | ||||||||||
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} | ||||||||||
|
@@ -154,7 +155,7 @@ case class RowDataSourceScanExec( | |||||||||
* @param requiredSchema Required schema of the underlying relation, excluding partition columns. | ||||||||||
* @param partitionFilters Predicates to use for partition pruning. | ||||||||||
* @param optionalBucketSet Bucket ids for bucket pruning. | ||||||||||
* @param optionalNumCoalescedBuckets Number of coalesced buckets. | ||||||||||
* @param optionalNewNumBuckets Number of buckets to coalesce or repartition. | ||||||||||
* @param dataFilters Filters on non-partition columns. | ||||||||||
* @param tableIdentifier identifier for the table in the metastore. | ||||||||||
*/ | ||||||||||
|
@@ -164,15 +165,23 @@ case class FileSourceScanExec( | |||||||||
requiredSchema: StructType, | ||||||||||
partitionFilters: Seq[Expression], | ||||||||||
optionalBucketSet: Option[BitSet], | ||||||||||
optionalNumCoalescedBuckets: Option[Int], | ||||||||||
optionalNewNumBuckets: Option[Int], | ||||||||||
dataFilters: Seq[Expression], | ||||||||||
tableIdentifier: Option[TableIdentifier]) | ||||||||||
extends DataSourceScanExec { | ||||||||||
|
||||||||||
// Note that some vals referring the file-based relation are lazy intentionally | ||||||||||
// so that this plan can be canonicalized on executor side too. See SPARK-23731. | ||||||||||
override lazy val supportsColumnar: Boolean = { | ||||||||||
relation.fileFormat.supportBatch(relation.sparkSession, schema) | ||||||||||
// `RepartitioningBucketRDD` converts columnar batches to rows to calculate bucket id for each | ||||||||||
// row, thus columnar is not supported when `RepartitioningBucketRDD` is used to avoid | ||||||||||
// conversions from batches to rows and back to batches. | ||||||||||
relation.fileFormat.supportBatch(relation.sparkSession, schema) && !isRepartitioningBuckets | ||||||||||
} | ||||||||||
|
||||||||||
@transient private lazy val isRepartitioningBuckets: Boolean = { | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: we don't need There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I followed the same style from |
||||||||||
bucketedScan && optionalNewNumBuckets.isDefined && | ||||||||||
optionalNewNumBuckets.get > relation.bucketSpec.get.numBuckets | ||||||||||
} | ||||||||||
|
||||||||||
private lazy val needsUnsafeRowConversion: Boolean = { | ||||||||||
|
@@ -292,7 +301,7 @@ case class FileSourceScanExec( | |||||||||
// above | ||||||||||
val spec = relation.bucketSpec.get | ||||||||||
val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n)) | ||||||||||
val numPartitions = optionalNumCoalescedBuckets.getOrElse(spec.numBuckets) | ||||||||||
val numPartitions = optionalNewNumBuckets.getOrElse(spec.numBuckets) | ||||||||||
val partitioning = HashPartitioning(bucketColumns, numPartitions) | ||||||||||
val sortColumns = | ||||||||||
spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => x.isDefined).map(_.get) | ||||||||||
|
@@ -314,7 +323,7 @@ case class FileSourceScanExec( | |||||||||
val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1) | ||||||||||
|
||||||||||
// TODO SPARK-24528 Sort order is currently ignored if buckets are coalesced. | ||||||||||
if (singleFilePartitions && optionalNumCoalescedBuckets.isEmpty) { | ||||||||||
if (singleFilePartitions && (optionalNewNumBuckets.isEmpty || isRepartitioningBuckets)) { | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we don't need There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Repartition can still maintain the sort order whereas coalescing cannot, thus this check is needed. |
||||||||||
// TODO Currently Spark does not support writing columns sorting in descending order | ||||||||||
// so using Ascending order. This can be fixed in future | ||||||||||
sortColumns.map(attribute => SortOrder(attribute, Ascending)) | ||||||||||
|
@@ -360,7 +369,9 @@ case class FileSourceScanExec( | |||||||||
} | ||||||||||
metadata + ("SelectedBucketsCount" -> | ||||||||||
(s"$numSelectedBuckets out of ${spec.numBuckets}" + | ||||||||||
optionalNumCoalescedBuckets.map { b => s" (Coalesced to $b)"}.getOrElse(""))) | ||||||||||
optionalNewNumBuckets.map { b => | ||||||||||
if (b > spec.numBuckets) s" (Repartitioned to $b)" else s" (Coalesced to $b)" | ||||||||||
}.getOrElse(""))) | ||||||||||
} getOrElse { | ||||||||||
metadata | ||||||||||
} | ||||||||||
|
@@ -548,22 +559,46 @@ case class FileSourceScanExec( | |||||||||
filesGroupedToBuckets | ||||||||||
} | ||||||||||
|
||||||||||
val filePartitions = optionalNumCoalescedBuckets.map { numCoalescedBuckets => | ||||||||||
logInfo(s"Coalescing to ${numCoalescedBuckets} buckets") | ||||||||||
val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % numCoalescedBuckets) | ||||||||||
Seq.tabulate(numCoalescedBuckets) { bucketId => | ||||||||||
val partitionedFiles = coalescedBuckets.get(bucketId).map { | ||||||||||
_.values.flatten.toArray | ||||||||||
}.getOrElse(Array.empty) | ||||||||||
FilePartition(bucketId, partitionedFiles) | ||||||||||
} | ||||||||||
}.getOrElse { | ||||||||||
Seq.tabulate(bucketSpec.numBuckets) { bucketId => | ||||||||||
if (optionalNewNumBuckets.isEmpty) { | ||||||||||
val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId => | ||||||||||
FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty)) | ||||||||||
} | ||||||||||
new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) | ||||||||||
} else { | ||||||||||
val newNumBuckets = optionalNewNumBuckets.get | ||||||||||
if (newNumBuckets < bucketSpec.numBuckets) { | ||||||||||
assert(bucketSpec.numBuckets % newNumBuckets == 0) | ||||||||||
logInfo(s"Coalescing to $newNumBuckets buckets from ${bucketSpec.numBuckets} buckets") | ||||||||||
val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % newNumBuckets) | ||||||||||
val filePartitions = Seq.tabulate(newNumBuckets) { bucketId => | ||||||||||
val partitionedFiles = coalescedBuckets | ||||||||||
.get(bucketId) | ||||||||||
.map(_.values.flatten.toArray) | ||||||||||
.getOrElse(Array.empty) | ||||||||||
FilePartition(bucketId, partitionedFiles) | ||||||||||
} | ||||||||||
new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) | ||||||||||
} else { | ||||||||||
assert(newNumBuckets % bucketSpec.numBuckets == 0) | ||||||||||
logInfo(s"Repartitioning to $newNumBuckets buckets from ${bucketSpec.numBuckets} buckets") | ||||||||||
val filePartitions = Seq.tabulate(newNumBuckets) { bucketId => | ||||||||||
FilePartition( | ||||||||||
bucketId, | ||||||||||
prunedFilesGroupedToBuckets.getOrElse(bucketId % bucketSpec.numBuckets, Array.empty)) | ||||||||||
} | ||||||||||
// There are now more files to be read. | ||||||||||
val filesNum = filePartitions.map(_.files.size.toLong).sum | ||||||||||
val filesSize = filePartitions.map(_.files.map(_.length).sum).sum | ||||||||||
driverMetrics("numFiles") = filesNum | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. per There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think spark/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala Lines 421 to 424 in cfe012a
|
||||||||||
driverMetrics("filesSize") = filesSize | ||||||||||
new BucketRepartitioningRDD( | ||||||||||
fsRelation.sparkSession, | ||||||||||
readFile, | ||||||||||
filePartitions, | ||||||||||
outputPartitioning.asInstanceOf[HashPartitioning].partitionIdExpression, | ||||||||||
output) | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) | ||||||||||
} | ||||||||||
|
||||||||||
/** | ||||||||||
|
@@ -622,7 +657,7 @@ case class FileSourceScanExec( | |||||||||
QueryPlan.normalizePredicates( | ||||||||||
filterUnusedDynamicPruningExpressions(partitionFilters), output), | ||||||||||
optionalBucketSet, | ||||||||||
optionalNumCoalescedBuckets, | ||||||||||
optionalNewNumBuckets, | ||||||||||
QueryPlan.normalizePredicates(dataFilters, output), | ||||||||||
None) | ||||||||||
} | ||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.execution.bucketing | ||
|
||
import scala.collection.JavaConverters._ | ||
|
||
import org.apache.spark.{Partition, TaskContext} | ||
import org.apache.spark.sql.SparkSession | ||
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, UnsafeProjection} | ||
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile} | ||
import org.apache.spark.sql.vectorized.ColumnarBatch | ||
|
||
/** | ||
* An RDD that filters out the rows that do not belong to the current bucket file being read. | ||
*/ | ||
private[spark] class BucketRepartitioningRDD( | ||
@transient private val sparkSession: SparkSession, | ||
readFunction: PartitionedFile => Iterator[InternalRow], | ||
@transient override val filePartitions: Seq[FilePartition], | ||
bucketIdExpression: Expression, | ||
output: Seq[Attribute]) | ||
extends FileScanRDD(sparkSession, readFunction, filePartitions) { | ||
|
||
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { | ||
val iter: Iterator[_] = super.compute(split, context) | ||
iter.flatMap { | ||
case row: InternalRow => Seq(row) | ||
case batch: ColumnarBatch => batch.rowIterator().asScala | ||
}.filter(getBucketId(_) == split.index) | ||
} | ||
|
||
private lazy val getBucketId: InternalRow => Int = { | ||
val projection = UnsafeProjection.create(Seq(bucketIdExpression), output) | ||
row => projection(row).getInt(0) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about how much this columnar execution makes performance gains though, the proposed idea is to give up the gains then use bucket repartitioning instead?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that the datasource will still be read as batches in this case (if whole stage codegen is enabled).
I see that physical plans operate on rows, so batches are converted to rows via
ColumnarToRow
anyway. So, I think perf impact would be minimal here; the difference could be the code-gen conversion from columnar to row vs. iteratingbatch.rowIterator()
inBucketRepartitioningRDD
.