-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-29544] [SQL] optimize skewed partition based on data size #26434
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@cloud-fan Please help me review. Thanks for your help. |
Test build #113434 has finished for PR 26434 at commit
|
Test build #113435 has finished for PR 26434 at commit
|
Test build #113444 has finished for PR 26434 at commit
|
Please help to retest. Thanks. |
retest this please |
Test build #113484 has finished for PR 26434 at commit
|
* Get a reader for the specific partitionIndex in map output statistics that are | ||
* produced by range mappers. Called on executors by reduce tasks. | ||
*/ | ||
def getReaderForRangeMapper[K, C]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can probably merge this method with getReaderForOneMapper
. one mapper is just a special range mappers.
} | ||
} | ||
|
||
case class SkewedShuffleReaderExec( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is different from local/coalesced shuffle reader as it reads only one reduce partition. Maybe better to call it PostShufflePartitionReader
val size = metrics.bytesByPartitionId(partitionId) | ||
val factor = size / medianSize | ||
val numMappers = getShuffleStage(stage). | ||
plan.shuffleDependency.rdd.partitions.length |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can easily get the data size of each mapper, shall we split mapper ranges based on data size?
@@ -279,6 +279,24 @@ private[sql] trait SQLTestData { self => | |||
df | |||
} | |||
|
|||
protected lazy val skewData1: DataFrame = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's define the data in where they are used. This file should only contains dataframes that are being used by multiple test suites.
Test build #113622 has finished for PR 26434 at commit
|
* and the second item is a sequence of (shuffle block id, shuffle block size, map index) | ||
* tuples describing the shuffle blocks that are stored at that block manager. | ||
*/ | ||
def convertMapStatuses( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we merge the two convertMapStatuses
?
@@ -116,7 +116,8 @@ class CoalescedPartitioner(val parent: Partitioner, val partitionStartIndices: A | |||
class ShuffledRowRDD( | |||
var dependency: ShuffleDependency[Int, InternalRow, InternalRow], | |||
metrics: Map[String, SQLMetric], | |||
specifiedPartitionStartIndices: Option[Array[Int]] = None) | |||
specifiedPartitionStartIndices: Option[Array[Int]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about Option[Array[(Int, Int)]]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe the separate definition to specifiedPartitionStartIndices
and specifiedPartitionEndIndices
more clear?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Option[Array[(Int, Int)]]
is more type-safe. It eliminates problems like
specifiedPartitionStartIndices
is specified butspecifiedPartitionEndIndices
is not.- these 2 have different lengths.
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedPartitions.scala
Outdated
Show resolved
Hide resolved
partitionId, rightMapIdStartIndices(j), rightEndMapId) | ||
|
||
subJoins += | ||
SortMergeJoinExec(leftKeys, rightKeys, joinType, condition, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just for brainstorm: here we are joining 2 partitions, not 2 RDDs, so there will be no shuffle. Is it better to run hash join than SMJ? cc @maryannxue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Furthermore, if only one side is skew, maybe better to plan a broadcast hash join instead of a union of many SMJs?
bbf585d
to
18cdcd9
Compare
@cloud-fan fix the conflicts and resolve the comments. Please help review again. Thanks for your help. |
var postMapPartitionSize: Long = mapPartitionSize(i) | ||
partitionStartIndices += i | ||
while (i < numMappers && i + 1 < numMappers) { | ||
val nextIndex = if (i + 1 < numMappers) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this always true ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@manuzhang Thanks for your review. Offline discussion with wenchen, we decided to remove this method. And split the skewed partition with the number of mappers.
i + 1 | ||
} else numMappers -1 | ||
|
||
if (postMapPartitionSize + mapPartitionSize(nextIndex) > advisoryTargetPostShuffleInputSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if this never comes true when adaptiveSkewedSizeThreshold
is smaller than targetPostShuffleInputSize
? I'm also wondering whether targetPostShuffleInputSize
can be reused for the threshold.
Test build #114751 has finished for PR 26434 at commit
|
} | ||
case None => | ||
Iterator.empty | ||
} | ||
} | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unnecessary change
@@ -410,6 +410,27 @@ object SQLConf { | |||
.booleanConf | |||
.createWithDefault(true) | |||
|
|||
val ADAPTIVE_EXECUTION_SKEWED_JOIN_ENABLED = buildConf("spark.sql.adaptive.skewedJoin.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: spark.sql.adaptive.optimizeSkewedJoin.enabled
buildConf("spark.sql.adaptive.skewedPartitionFactor") | ||
.doc("A partition is considered as a skewed partition if its size is larger than" + | ||
" this factor multiple the median partition size and also larger than " + | ||
"spark.sql.adaptive.skewedPartitionSizeThreshold.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ${ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD.key}
instead of hardcode
|
||
def adaptiveSkewedFactor: Int = getConf(ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) | ||
|
||
def adaptiveSkewedSizeThreshold: Long = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we don't need these 3 methods if they are only called 1 or 2 times.
val shuffleStageCheck = ShuffleQueryStageExec.isShuffleQueryStageExec(leftStage) && | ||
ShuffleQueryStageExec.isShuffleQueryStageExec(rightStage) | ||
val statisticsReady: Boolean = if (shuffleStageCheck) { | ||
getStatistics(leftStage) != null && getStatistics(rightStage) != null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how can the stats be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems not be null. Wrong understanding about the leftStage and rightStage may not done simultaneously.
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedPartitions.scala
Outdated
Show resolved
Hide resolved
joinTypeSupported && statisticsReady | ||
} | ||
|
||
private def supportSplitOnLeftPartition(joinType: JoinType) = joinType != RightOuter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it's more robust to list the supported join types.
@cloud-fan updated the comments online and offline. Please help me review again. Thanks. |
Test build #4991 has finished for PR 26434 at commit
|
Test build #4992 has finished for PR 26434 at commit
|
retest this please |
Test build #116654 has finished for PR 26434 at commit
|
create partial shuffle reader
Test build #116695 has finished for PR 26434 at commit
|
thanks, merging to master! |
val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId | ||
val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId) | ||
val maxSplits = math.min(conf.getConf( | ||
SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS), mapPartitionSizes.length) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this config? It seems a bit weird that we try to use the actual size everywhere else, and we are not doing it here. I think that just using the SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD
for the target partition size by itself should yield good results.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the feedback in @manuzhang environment. In some use case, the split number may be very large (more than 1000). There will be too many smjs and it may take a long time to launch job after optimizing skewed join. The ui will also be choked by the huge praph. So we add this configuration as a upper limit of the split number.
val partitionStartIndices = ArrayBuffer[Int]() | ||
var postMapPartitionSize = mapPartitionSizes(0) | ||
partitionStartIndices += 0 | ||
partitionIndices.drop(1).foreach { nextPartitionIndex => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In some case writing a while loop is actually easier to understand, e.g.:
var postMapPartitionSize = advisoryPartitionSize + 1
var nextPartitionIndex = 0
while (nextPartitionIndex < mapPartitionSizes.length) {
val nextMapPartitionSize = mapPartitionSizes(nextPartitionIndex)
if (postMapPartitionSize + nextMapPartitionSize > advisoryPartitionSize) {
partitionStartIndices += nextPartitionIndex
postMapPartitionSize = 0
}
postMapPartitionSize += nextMapPartitionSize
nextPartitionIndex += 1
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I will update later.
} | ||
} | ||
|
||
if (partitionStartIndices.size > maxSplits) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes the last partition larger right? Isn't that adding some skew?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it may cause the last partition be skewed. This approach can not solve the extreme skewed use case.
* This RDD takes a [[ShuffleDependency]] (`dependency`), a partitionIndex | ||
* and the range of startMapIndex to endMapIndex. | ||
*/ | ||
class SkewedShuffledRowRDD( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a separate implementation of the shuffled row RDD? I am wondering if we can combine them all, and have a couple of partition implementations depending on which (mapper/reducer) coordinate we need to read from.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW bifurcation of the query plan (part hash join, part smj) is slightly orthogonal to this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds like a good idea.
private def medianSize(stats: MapOutputStatistics): Long = { | ||
val numPartitions = stats.bytesByPartitionId.length | ||
val bytes = stats.bytesByPartitionId.sorted | ||
if (bytes(numPartitions / 2) > 0) bytes(numPartitions / 2) else 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
math.min(bytes(numPartitions / 2), 1)
?
OCD: you could argue that this median calculation is incorrect for an even number of elements. In that case it should be (bytes(numPartitions / 2) + bytes((numPartitions + 1) / 2)) / 2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. And I will updated later.
|
||
val leftMedSize = medianSize(leftStats) | ||
val rightMedSize = medianSize(rightStats) | ||
val leftSizeInfo = s"median size: $leftMedSize, max size: ${leftStats.bytesByPartitionId.max}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT you can avoid materializing the string if debug logging is not enabled by making these defs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I will update later.
val shuffleStages = collectShuffleStages(plan) | ||
|
||
if (shuffleStages.length == 2) { | ||
// Currently we only support handling skewed join for 2 table join. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this being planned?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it that we want to avoid things like:
SMJ
SMJ (w/ or w/o Sort)
Shuffle (w/ or w/o Sort)
Shuffle (w/ or w/o Sort)
Shuffle (w/ or w/o Sort)
i.e., A SMJ that contains a child SMJ without a Shuffle on top, so we don't wanna optimize the child SMJ coz it changes the outputPartitioning? If so, can we make that clearer in the comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and since it affects the outputPartitioning, we need to check if it breaks the operators above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are scenario's where we can do this for multiple joins right? INNER
for example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be too complex to optimize the multiple joins. And we optimize 2 table join firstly. We can further optimize the multiple joins later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maryannxue yes, we need to consider the effect of outputPartitioning. And I will updated in the follwing PRs.
stage.shuffle.shuffleDependency.rdd.partitions.length | ||
} | ||
|
||
def handleSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general it would be good to have some documentation for this method. What you are doing here is not completely trivial :). The same applies for the code inside the method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the design doc in "Idea" section can explain?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do a summary and put it as code comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I will add later.
} | ||
logDebug(s"number of skewed partitions is ${skewedPartitions.size}") | ||
if (skewedPartitions.nonEmpty) { | ||
val optimizedSmj = smj.transformDown { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can just rewrite the SMJ directly right? The alternative is that you use transform down and rewrite the ShuffleQueryStageExec's directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I will update later.
@@ -87,6 +87,10 @@ case class AdaptiveSparkPlanExec( | |||
// optimizations should be stage-independent. | |||
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( | |||
ReuseAdaptiveSubquery(conf, context.subqueryCache), | |||
// Here the 'OptimizeSkewedPartitions' rule should be executed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment out-of-date
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I Will update later.
* @param startMapIndex The start map index. | ||
* @param endMapIndex The end map index. | ||
*/ | ||
case class SkewedPartitionReaderExec( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like with the RDDs: In general I would be in favor of creating one reader node that can deal with the different kinds of shuffle reads. That avoid a sprawl of readers, and it also allows us to create a much simpler plan if we can just use 1 reader with a join instead of using a union.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. We also plan to further optimize the skew reader to make the plan simpler later.
@@ -579,6 +579,153 @@ class AdaptiveQueryExecSuite | |||
} | |||
} | |||
|
|||
test("SPARK-29544: adaptive skew join with different join types") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do something to increase coverage for different join types?
|
||
def handleSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp { | ||
case smj @ SortMergeJoinExec(leftKeys, rightKeys, joinType, condition, | ||
s1 @ SortExec(_, _, left: ShuffleQueryStageExec, _), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't necessarily have a Sort
in SMJ.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. We may can also optimize the following use case:
SMJ
Shuffle
Shuffle
} | ||
|
||
def collectShuffleStages(plan: SparkPlan): Seq[ShuffleQueryStageExec] = plan match { | ||
case _: LocalShuffleReaderExec => Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could loose an optimization opportunity here. You could have a situation where we converted a SMJ->BHJ and where there is a shuffled join on top of this. Anyway this will require some more changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might even be a correctness problem if the LocalShuffleReader produces a partitioning that is actually leveraged later in the stage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. We can do further optimization later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove these "reader" cases here:
- this rule applies first, so we won't see these readers anyway.
- even if this rule is not the first to apply, we should not skip them otherwise we could miscount. we are not matching them in transform anyway, so we are safe to ignore them here.
@JkSelf I am very excited about this work, this will improve the end-user experience for a lot of users. I have left some additional comments; I hope you don't mind that I am late to the party. |
…in optimizations <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html 2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'. 4. Be sure to keep the PR description updated to reflect all changes. 5. Please write your PR title to summarize what this PR proposes. 6. If possible, provide a concise example to reproduce the issue for a faster review. --> ### What changes were proposed in this pull request? <!-- Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below. 1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers. 2. If you fix some SQL features, you can provide some references of other DBMSes. 3. If there is design documentation, please add the link. 4. If there is a discussion in the mailing list, please add the link. --> This is a followup of #26434 This PR use one special shuffle reader for skew join, so that we only have one join after optimization. In order to do that, this PR 1. add a very general `CustomShuffledRowRDD` which support all kind of partition arrangement. 2. move the logic of coalescing shuffle partitions to a util function, and call it during skew join optimization, to totally decouple with the `ReduceNumShufflePartitions` rule. It's too complicated to interfere skew join with `ReduceNumShufflePartitions`, as you need to consider the size of split partitions which don't respect target size already. ### Why are the changes needed? <!-- Please clarify why the changes are needed. For instance, 1. If you propose a new API, clarify the use case for a new API. 2. If you fix a bug, you can clarify why it is a bug. --> The current skew join optimization has a serious performance issue: the size of the query plan depends on the number and size of skewed partitions. ### Does this PR introduce any user-facing change? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If no, write 'No'. --> no ### How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future. If tests were not added, please describe why they were not added and/or why it was difficult to add. --> existing tests test UI manually:  explain output ``` AdaptiveSparkPlan(isFinalPlan=true) +- OverwriteByExpression org.apache.spark.sql.execution.datasources.noop.NoopTable$403a2ed5, [AlwaysTrue()], org.apache.spark.sql.util.CaseInsensitiveStringMap1f +- *(5) SortMergeJoin(skew=true) [key1#2L], [key2#6L], Inner :- *(3) Sort [key1#2L ASC NULLS FIRST], false, 0 : +- SkewJoinShuffleReader 2 skewed partitions with size(max=5 KB, min=5 KB, avg=5 KB) : +- ShuffleQueryStage 0 : +- Exchange hashpartitioning(key1#2L, 200), true, [id=#53] : +- *(1) Project [(id#0L % 2) AS key1#2L] : +- *(1) Filter isnotnull((id#0L % 2)) : +- *(1) Range (0, 100000, step=1, splits=6) +- *(4) Sort [key2#6L ASC NULLS FIRST], false, 0 +- SkewJoinShuffleReader 2 skewed partitions with size(max=5 KB, min=5 KB, avg=5 KB) +- ShuffleQueryStage 1 +- Exchange hashpartitioning(key2#6L, 200), true, [id=#64] +- *(2) Project [((id#4L % 2) + 1) AS key2#6L] +- *(2) Filter isnotnull(((id#4L % 2) + 1)) +- *(2) Range (0, 100000, step=1, splits=6) ``` Closes #27493 from cloud-fan/aqe. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: herman <herman@databricks.com>
…in optimizations <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html 2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'. 4. Be sure to keep the PR description updated to reflect all changes. 5. Please write your PR title to summarize what this PR proposes. 6. If possible, provide a concise example to reproduce the issue for a faster review. --> ### What changes were proposed in this pull request? <!-- Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below. 1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers. 2. If you fix some SQL features, you can provide some references of other DBMSes. 3. If there is design documentation, please add the link. 4. If there is a discussion in the mailing list, please add the link. --> This is a followup of #26434 This PR use one special shuffle reader for skew join, so that we only have one join after optimization. In order to do that, this PR 1. add a very general `CustomShuffledRowRDD` which support all kind of partition arrangement. 2. move the logic of coalescing shuffle partitions to a util function, and call it during skew join optimization, to totally decouple with the `ReduceNumShufflePartitions` rule. It's too complicated to interfere skew join with `ReduceNumShufflePartitions`, as you need to consider the size of split partitions which don't respect target size already. ### Why are the changes needed? <!-- Please clarify why the changes are needed. For instance, 1. If you propose a new API, clarify the use case for a new API. 2. If you fix a bug, you can clarify why it is a bug. --> The current skew join optimization has a serious performance issue: the size of the query plan depends on the number and size of skewed partitions. ### Does this PR introduce any user-facing change? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If no, write 'No'. --> no ### How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future. If tests were not added, please describe why they were not added and/or why it was difficult to add. --> existing tests test UI manually:  explain output ``` AdaptiveSparkPlan(isFinalPlan=true) +- OverwriteByExpression org.apache.spark.sql.execution.datasources.noop.NoopTable$403a2ed5, [AlwaysTrue()], org.apache.spark.sql.util.CaseInsensitiveStringMap1f +- *(5) SortMergeJoin(skew=true) [key1#2L], [key2#6L], Inner :- *(3) Sort [key1#2L ASC NULLS FIRST], false, 0 : +- SkewJoinShuffleReader 2 skewed partitions with size(max=5 KB, min=5 KB, avg=5 KB) : +- ShuffleQueryStage 0 : +- Exchange hashpartitioning(key1#2L, 200), true, [id=#53] : +- *(1) Project [(id#0L % 2) AS key1#2L] : +- *(1) Filter isnotnull((id#0L % 2)) : +- *(1) Range (0, 100000, step=1, splits=6) +- *(4) Sort [key2#6L ASC NULLS FIRST], false, 0 +- SkewJoinShuffleReader 2 skewed partitions with size(max=5 KB, min=5 KB, avg=5 KB) +- ShuffleQueryStage 1 +- Exchange hashpartitioning(key2#6L, 200), true, [id=#64] +- *(2) Project [((id#4L % 2) + 1) AS key2#6L] +- *(2) Filter isnotnull(((id#4L % 2) + 1)) +- *(2) Range (0, 100000, step=1, splits=6) ``` Closes #27493 from cloud-fan/aqe. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: herman <herman@databricks.com> (cherry picked from commit a4ceea6) Signed-off-by: herman <herman@databricks.com>
…in optimizations <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html 2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'. 4. Be sure to keep the PR description updated to reflect all changes. 5. Please write your PR title to summarize what this PR proposes. 6. If possible, provide a concise example to reproduce the issue for a faster review. --> ### What changes were proposed in this pull request? <!-- Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below. 1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers. 2. If you fix some SQL features, you can provide some references of other DBMSes. 3. If there is design documentation, please add the link. 4. If there is a discussion in the mailing list, please add the link. --> This is a followup of apache#26434 This PR use one special shuffle reader for skew join, so that we only have one join after optimization. In order to do that, this PR 1. add a very general `CustomShuffledRowRDD` which support all kind of partition arrangement. 2. move the logic of coalescing shuffle partitions to a util function, and call it during skew join optimization, to totally decouple with the `ReduceNumShufflePartitions` rule. It's too complicated to interfere skew join with `ReduceNumShufflePartitions`, as you need to consider the size of split partitions which don't respect target size already. ### Why are the changes needed? <!-- Please clarify why the changes are needed. For instance, 1. If you propose a new API, clarify the use case for a new API. 2. If you fix a bug, you can clarify why it is a bug. --> The current skew join optimization has a serious performance issue: the size of the query plan depends on the number and size of skewed partitions. ### Does this PR introduce any user-facing change? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If no, write 'No'. --> no ### How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future. If tests were not added, please describe why they were not added and/or why it was difficult to add. --> existing tests test UI manually:  explain output ``` AdaptiveSparkPlan(isFinalPlan=true) +- OverwriteByExpression org.apache.spark.sql.execution.datasources.noop.NoopTable$403a2ed5, [AlwaysTrue()], org.apache.spark.sql.util.CaseInsensitiveStringMap1f +- *(5) SortMergeJoin(skew=true) [key1#2L], [key2#6L], Inner :- *(3) Sort [key1#2L ASC NULLS FIRST], false, 0 : +- SkewJoinShuffleReader 2 skewed partitions with size(max=5 KB, min=5 KB, avg=5 KB) : +- ShuffleQueryStage 0 : +- Exchange hashpartitioning(key1#2L, 200), true, [id=apache#53] : +- *(1) Project [(id#0L % 2) AS key1#2L] : +- *(1) Filter isnotnull((id#0L % 2)) : +- *(1) Range (0, 100000, step=1, splits=6) +- *(4) Sort [key2#6L ASC NULLS FIRST], false, 0 +- SkewJoinShuffleReader 2 skewed partitions with size(max=5 KB, min=5 KB, avg=5 KB) +- ShuffleQueryStage 1 +- Exchange hashpartitioning(key2#6L, 200), true, [id=apache#64] +- *(2) Project [((id#4L % 2) + 1) AS key2#6L] +- *(2) Filter isnotnull(((id#4L % 2) + 1)) +- *(2) Range (0, 100000, step=1, splits=6) ``` Closes apache#27493 from cloud-fan/aqe. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: herman <herman@databricks.com>
What changes were proposed in this pull request?
Skew Join is common and can severely downgrade performance of queries, especially those with joins. This PR aim to optimization the skew join based on the runtime Map output statistics by adding "OptimizeSkewedPartitions" rule. And The details design doc is here. Currently we can support "Inner, Cross, LeftSemi, LeftAnti, LeftOuter, RightOuter" join type.
Why are the changes needed?
To optimize the skewed partition in runtime based on AQE
Does this PR introduce any user-facing change?
No
How was this patch tested?
UT
We have test this PR with the following sql in 5 node clusters.
The main spark configuration is:
This PR can gain about 6x performance improvement(27s vs 162s). And the following is the UI of with and without this PR.

With this PR:
Without this PR:
