Skip to content

[SPARK-29544] [SQL] optimize skewed partition based on data size #26434

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 24 commits into from

Conversation

JkSelf
Copy link
Contributor

@JkSelf JkSelf commented Nov 8, 2019

What changes were proposed in this pull request?

Skew Join is common and can severely downgrade performance of queries, especially those with joins. This PR aim to optimization the skew join based on the runtime Map output statistics by adding "OptimizeSkewedPartitions" rule. And The details design doc is here. Currently we can support "Inner, Cross, LeftSemi, LeftAnti, LeftOuter, RightOuter" join type.

Why are the changes needed?

To optimize the skewed partition in runtime based on AQE

Does this PR introduce any user-facing change?

No

How was this patch tested?

UT

We have test this PR with the following sql in 5 node clusters.

spark.range(0, 100000, 1, 6).selectExpr("id % 2 as key1").createOrReplaceTempView("test1");
spark.range(0, 100000, 1, 6).selectExpr("id % 2 +1 as key2").createOrReplaceTempView("test2");
import org.apache.spark.sql.SaveMode.Overwrite
spark.sql("select * from test1, test2 where key1 = key2").write.format("noop").mode(Overwrite).save()

The main spark configuration is:

spark.sql.shuffle.partitions 500
spark.sql.autoBroadcastJoinThreshold -1
spark.sql.adaptive.enabled true
spark.sql.adaptive.shuffle.localShuffleReader.enabled false
spark.sql.adaptive.shuffle.reducePostShufflePartitions.enabled false
spark.sql.adaptive.optimizeSkewedJoin.skewedPartitionSizeThreshold 500

This PR can gain about 6x performance improvement(27s vs 162s). And the following is the UI of with and without this PR.
With this PR:
image

Without this PR:
image

@JkSelf
Copy link
Contributor Author

JkSelf commented Nov 8, 2019

@cloud-fan Please help me review. Thanks for your help.

@SparkQA
Copy link

SparkQA commented Nov 8, 2019

Test build #113434 has finished for PR 26434 at commit c64df90.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class OptimizeSkewedPartitions(conf: SQLConf) extends Rule[SparkPlan]
  • case class SkewedShuffleReaderExec(
  • class SkewedShuffledRowRDD(

@SparkQA
Copy link

SparkQA commented Nov 8, 2019

Test build #113435 has finished for PR 26434 at commit f0f03c5.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 8, 2019

Test build #113444 has finished for PR 26434 at commit 5f56f89.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JkSelf
Copy link
Contributor Author

JkSelf commented Nov 8, 2019

Please help to retest. Thanks.

@maropu
Copy link
Member

maropu commented Nov 9, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Nov 9, 2019

Test build #113484 has finished for PR 26434 at commit 5f56f89.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* Get a reader for the specific partitionIndex in map output statistics that are
* produced by range mappers. Called on executors by reduce tasks.
*/
def getReaderForRangeMapper[K, C](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can probably merge this method with getReaderForOneMapper. one mapper is just a special range mappers.

}
}

case class SkewedShuffleReaderExec(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is different from local/coalesced shuffle reader as it reads only one reduce partition. Maybe better to call it PostShufflePartitionReader

val size = metrics.bytesByPartitionId(partitionId)
val factor = size / medianSize
val numMappers = getShuffleStage(stage).
plan.shuffleDependency.rdd.partitions.length
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can easily get the data size of each mapper, shall we split mapper ranges based on data size?

@@ -279,6 +279,24 @@ private[sql] trait SQLTestData { self =>
df
}

protected lazy val skewData1: DataFrame = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's define the data in where they are used. This file should only contains dataframes that are being used by multiple test suites.

@SparkQA
Copy link

SparkQA commented Nov 12, 2019

Test build #113622 has finished for PR 26434 at commit bbf585d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class PostShufflePartitionReader(

* and the second item is a sequence of (shuffle block id, shuffle block size, map index)
* tuples describing the shuffle blocks that are stored at that block manager.
*/
def convertMapStatuses(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we merge the two convertMapStatuses?

@@ -116,7 +116,8 @@ class CoalescedPartitioner(val parent: Partitioner, val partitionStartIndices: A
class ShuffledRowRDD(
var dependency: ShuffleDependency[Int, InternalRow, InternalRow],
metrics: Map[String, SQLMetric],
specifiedPartitionStartIndices: Option[Array[Int]] = None)
specifiedPartitionStartIndices: Option[Array[Int]] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about Option[Array[(Int, Int)]]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe the separate definition to specifiedPartitionStartIndices and specifiedPartitionEndIndices more clear?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option[Array[(Int, Int)]] is more type-safe. It eliminates problems like

  1. specifiedPartitionStartIndices is specified but specifiedPartitionEndIndices is not.
  2. these 2 have different lengths.

partitionId, rightMapIdStartIndices(j), rightEndMapId)

subJoins +=
SortMergeJoinExec(leftKeys, rightKeys, joinType, condition,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for brainstorm: here we are joining 2 partitions, not 2 RDDs, so there will be no shuffle. Is it better to run hash join than SMJ? cc @maryannxue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Furthermore, if only one side is skew, maybe better to plan a broadcast hash join instead of a union of many SMJs?

@JkSelf JkSelf force-pushed the skewedPartitionBasedSize branch from bbf585d to 18cdcd9 Compare December 3, 2019 02:48
@JkSelf
Copy link
Contributor Author

JkSelf commented Dec 3, 2019

@cloud-fan fix the conflicts and resolve the comments. Please help review again. Thanks for your help.

var postMapPartitionSize: Long = mapPartitionSize(i)
partitionStartIndices += i
while (i < numMappers && i + 1 < numMappers) {
val nextIndex = if (i + 1 < numMappers) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this always true ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@manuzhang Thanks for your review. Offline discussion with wenchen, we decided to remove this method. And split the skewed partition with the number of mappers.

i + 1
} else numMappers -1

if (postMapPartitionSize + mapPartitionSize(nextIndex) > advisoryTargetPostShuffleInputSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if this never comes true when adaptiveSkewedSizeThreshold is smaller than targetPostShuffleInputSize ? I'm also wondering whether targetPostShuffleInputSize can be reused for the threshold.

@SparkQA
Copy link

SparkQA commented Dec 3, 2019

Test build #114751 has finished for PR 26434 at commit 18cdcd9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
case None =>
Iterator.empty
}
}



Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unnecessary change

@@ -410,6 +410,27 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val ADAPTIVE_EXECUTION_SKEWED_JOIN_ENABLED = buildConf("spark.sql.adaptive.skewedJoin.enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: spark.sql.adaptive.optimizeSkewedJoin.enabled

buildConf("spark.sql.adaptive.skewedPartitionFactor")
.doc("A partition is considered as a skewed partition if its size is larger than" +
" this factor multiple the median partition size and also larger than " +
"spark.sql.adaptive.skewedPartitionSizeThreshold.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ${ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD.key} instead of hardcode


def adaptiveSkewedFactor: Int = getConf(ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR)

def adaptiveSkewedSizeThreshold: Long =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we don't need these 3 methods if they are only called 1 or 2 times.

val shuffleStageCheck = ShuffleQueryStageExec.isShuffleQueryStageExec(leftStage) &&
ShuffleQueryStageExec.isShuffleQueryStageExec(rightStage)
val statisticsReady: Boolean = if (shuffleStageCheck) {
getStatistics(leftStage) != null && getStatistics(rightStage) != null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can the stats be null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems not be null. Wrong understanding about the leftStage and rightStage may not done simultaneously.

joinTypeSupported && statisticsReady
}

private def supportSplitOnLeftPartition(joinType: JoinType) = joinType != RightOuter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it's more robust to list the supported join types.

@JkSelf
Copy link
Contributor Author

JkSelf commented Dec 9, 2019

@cloud-fan updated the comments online and offline. Please help me review again. Thanks.

@SparkQA
Copy link

SparkQA commented Jan 13, 2020

Test build #4991 has finished for PR 26434 at commit cee1c8c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 13, 2020

Test build #4992 has finished for PR 26434 at commit cee1c8c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Jan 13, 2020

Test build #116654 has finished for PR 26434 at commit cee1c8c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 14, 2020

Test build #116695 has finished for PR 26434 at commit ac17a7c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in a2aa966 Jan 14, 2020
val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId
val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId)
val maxSplits = math.min(conf.getConf(
SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS), mapPartitionSizes.length)
Copy link
Contributor

@hvanhovell hvanhovell Jan 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this config? It seems a bit weird that we try to use the actual size everywhere else, and we are not doing it here. I think that just using the SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD for the target partition size by itself should yield good results.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the feedback in @manuzhang environment. In some use case, the split number may be very large (more than 1000). There will be too many smjs and it may take a long time to launch job after optimizing skewed join. The ui will also be choked by the huge praph. So we add this configuration as a upper limit of the split number.

val partitionStartIndices = ArrayBuffer[Int]()
var postMapPartitionSize = mapPartitionSizes(0)
partitionStartIndices += 0
partitionIndices.drop(1).foreach { nextPartitionIndex =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some case writing a while loop is actually easier to understand, e.g.:

var postMapPartitionSize = advisoryPartitionSize + 1
var nextPartitionIndex = 0
while (nextPartitionIndex < mapPartitionSizes.length) {
  val nextMapPartitionSize = mapPartitionSizes(nextPartitionIndex)
  if (postMapPartitionSize + nextMapPartitionSize > advisoryPartitionSize) {
    partitionStartIndices += nextPartitionIndex
    postMapPartitionSize = 0
  }
  postMapPartitionSize += nextMapPartitionSize
  nextPartitionIndex += 1
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I will update later.

}
}

if (partitionStartIndices.size > maxSplits) {
Copy link
Contributor

@hvanhovell hvanhovell Jan 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes the last partition larger right? Isn't that adding some skew?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it may cause the last partition be skewed. This approach can not solve the extreme skewed use case.

* This RDD takes a [[ShuffleDependency]] (`dependency`), a partitionIndex
* and the range of startMapIndex to endMapIndex.
*/
class SkewedShuffledRowRDD(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a separate implementation of the shuffled row RDD? I am wondering if we can combine them all, and have a couple of partition implementations depending on which (mapper/reducer) coordinate we need to read from.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW bifurcation of the query plan (part hash join, part smj) is slightly orthogonal to this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like a good idea.

private def medianSize(stats: MapOutputStatistics): Long = {
val numPartitions = stats.bytesByPartitionId.length
val bytes = stats.bytesByPartitionId.sorted
if (bytes(numPartitions / 2) > 0) bytes(numPartitions / 2) else 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

math.min(bytes(numPartitions / 2), 1)?

OCD: you could argue that this median calculation is incorrect for an even number of elements. In that case it should be (bytes(numPartitions / 2) + bytes((numPartitions + 1) / 2)) / 2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. And I will updated later.


val leftMedSize = medianSize(leftStats)
val rightMedSize = medianSize(rightStats)
val leftSizeInfo = s"median size: $leftMedSize, max size: ${leftStats.bytesByPartitionId.max}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT you can avoid materializing the string if debug logging is not enabled by making these defs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I will update later.

val shuffleStages = collectShuffleStages(plan)

if (shuffleStages.length == 2) {
// Currently we only support handling skewed join for 2 table join.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this being planned?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it that we want to avoid things like:

SMJ
  SMJ (w/ or w/o Sort)
    Shuffle (w/ or w/o Sort)
    Shuffle (w/ or w/o Sort)
  Shuffle (w/ or w/o Sort)

i.e., A SMJ that contains a child SMJ without a Shuffle on top, so we don't wanna optimize the child SMJ coz it changes the outputPartitioning? If so, can we make that clearer in the comment?

Copy link
Contributor

@maryannxue maryannxue Jan 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and since it affects the outputPartitioning, we need to check if it breaks the operators above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are scenario's where we can do this for multiple joins right? INNER for example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be too complex to optimize the multiple joins. And we optimize 2 table join firstly. We can further optimize the multiple joins later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maryannxue yes, we need to consider the effect of outputPartitioning. And I will updated in the follwing PRs.

stage.shuffle.shuffleDependency.rdd.partitions.length
}

def handleSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general it would be good to have some documentation for this method. What you are doing here is not completely trivial :). The same applies for the code inside the method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the design doc in "Idea" section can explain?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do a summary and put it as code comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I will add later.

}
logDebug(s"number of skewed partitions is ${skewedPartitions.size}")
if (skewedPartitions.nonEmpty) {
val optimizedSmj = smj.transformDown {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just rewrite the SMJ directly right? The alternative is that you use transform down and rewrite the ShuffleQueryStageExec's directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I will update later.

@@ -87,6 +87,10 @@ case class AdaptiveSparkPlanExec(
// optimizations should be stage-independent.
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
ReuseAdaptiveSubquery(conf, context.subqueryCache),
// Here the 'OptimizeSkewedPartitions' rule should be executed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment out-of-date

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I Will update later.

* @param startMapIndex The start map index.
* @param endMapIndex The end map index.
*/
case class SkewedPartitionReaderExec(
Copy link
Contributor

@hvanhovell hvanhovell Jan 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like with the RDDs: In general I would be in favor of creating one reader node that can deal with the different kinds of shuffle reads. That avoid a sprawl of readers, and it also allows us to create a much simpler plan if we can just use 1 reader with a join instead of using a union.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We also plan to further optimize the skew reader to make the plan simpler later.

@@ -579,6 +579,153 @@ class AdaptiveQueryExecSuite
}
}

test("SPARK-29544: adaptive skew join with different join types") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do something to increase coverage for different join types?


def handleSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
case smj @ SortMergeJoinExec(leftKeys, rightKeys, joinType, condition,
s1 @ SortExec(_, _, left: ShuffleQueryStageExec, _),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't necessarily have a Sort in SMJ.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We may can also optimize the following use case:

SMJ 
    Shuffle 
    Shuffle

}

def collectShuffleStages(plan: SparkPlan): Seq[ShuffleQueryStageExec] = plan match {
case _: LocalShuffleReaderExec => Nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could loose an optimization opportunity here. You could have a situation where we converted a SMJ->BHJ and where there is a shuffled join on top of this. Anyway this will require some more changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might even be a correctness problem if the LocalShuffleReader produces a partitioning that is actually leveraged later in the stage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. We can do further optimization later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove these "reader" cases here:

  1. this rule applies first, so we won't see these readers anyway.
  2. even if this rule is not the first to apply, we should not skip them otherwise we could miscount. we are not matching them in transform anyway, so we are safe to ignore them here.

@hvanhovell
Copy link
Contributor

@JkSelf I am very excited about this work, this will improve the end-user experience for a lot of users. I have left some additional comments; I hope you don't mind that I am late to the party.

hvanhovell pushed a commit that referenced this pull request Feb 13, 2020
…in optimizations

<!--
Thanks for sending a pull request!  Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
  2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
  4. Be sure to keep the PR description updated to reflect all changes.
  5. Please write your PR title to summarize what this PR proposes.
  6. If possible, provide a concise example to reproduce the issue for a faster review.
-->

### What changes were proposed in this pull request?
<!--
Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
  1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
  2. If you fix some SQL features, you can provide some references of other DBMSes.
  3. If there is design documentation, please add the link.
  4. If there is a discussion in the mailing list, please add the link.
-->
This is a followup of #26434

This PR use one special shuffle reader for skew join, so that we only have one join after optimization. In order to do that, this PR
1. add a very general `CustomShuffledRowRDD` which support all kind of partition arrangement.
2. move the logic of coalescing shuffle partitions to a util function, and call it during skew join optimization, to totally decouple with the `ReduceNumShufflePartitions` rule. It's too complicated to interfere skew join with `ReduceNumShufflePartitions`, as you need to consider the size of split partitions which don't respect target size already.

### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
  1. If you propose a new API, clarify the use case for a new API.
  2. If you fix a bug, you can clarify why it is a bug.
-->
The current skew join optimization has a serious performance issue: the size of the query plan depends on the number and size of skewed partitions.

### Does this PR introduce any user-facing change?
<!--
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If no, write 'No'.
-->
no

### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
-->
existing tests

test UI manually:
![image](https://user-images.githubusercontent.com/3182036/74357390-cfb30480-4dfa-11ea-83f6-825d1b9379ca.png)

explain output
```
AdaptiveSparkPlan(isFinalPlan=true)
+- OverwriteByExpression org.apache.spark.sql.execution.datasources.noop.NoopTable$403a2ed5, [AlwaysTrue()], org.apache.spark.sql.util.CaseInsensitiveStringMap1f
   +- *(5) SortMergeJoin(skew=true) [key1#2L], [key2#6L], Inner
      :- *(3) Sort [key1#2L ASC NULLS FIRST], false, 0
      :  +- SkewJoinShuffleReader 2 skewed partitions with size(max=5 KB, min=5 KB, avg=5 KB)
      :     +- ShuffleQueryStage 0
      :        +- Exchange hashpartitioning(key1#2L, 200), true, [id=#53]
      :           +- *(1) Project [(id#0L % 2) AS key1#2L]
      :              +- *(1) Filter isnotnull((id#0L % 2))
      :                 +- *(1) Range (0, 100000, step=1, splits=6)
      +- *(4) Sort [key2#6L ASC NULLS FIRST], false, 0
         +- SkewJoinShuffleReader 2 skewed partitions with size(max=5 KB, min=5 KB, avg=5 KB)
            +- ShuffleQueryStage 1
               +- Exchange hashpartitioning(key2#6L, 200), true, [id=#64]
                  +- *(2) Project [((id#4L % 2) + 1) AS key2#6L]
                     +- *(2) Filter isnotnull(((id#4L % 2) + 1))
                        +- *(2) Range (0, 100000, step=1, splits=6)
```

Closes #27493 from cloud-fan/aqe.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: herman <herman@databricks.com>
hvanhovell pushed a commit that referenced this pull request Feb 13, 2020
…in optimizations

<!--
Thanks for sending a pull request!  Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
  2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
  4. Be sure to keep the PR description updated to reflect all changes.
  5. Please write your PR title to summarize what this PR proposes.
  6. If possible, provide a concise example to reproduce the issue for a faster review.
-->

### What changes were proposed in this pull request?
<!--
Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
  1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
  2. If you fix some SQL features, you can provide some references of other DBMSes.
  3. If there is design documentation, please add the link.
  4. If there is a discussion in the mailing list, please add the link.
-->
This is a followup of #26434

This PR use one special shuffle reader for skew join, so that we only have one join after optimization. In order to do that, this PR
1. add a very general `CustomShuffledRowRDD` which support all kind of partition arrangement.
2. move the logic of coalescing shuffle partitions to a util function, and call it during skew join optimization, to totally decouple with the `ReduceNumShufflePartitions` rule. It's too complicated to interfere skew join with `ReduceNumShufflePartitions`, as you need to consider the size of split partitions which don't respect target size already.

### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
  1. If you propose a new API, clarify the use case for a new API.
  2. If you fix a bug, you can clarify why it is a bug.
-->
The current skew join optimization has a serious performance issue: the size of the query plan depends on the number and size of skewed partitions.

### Does this PR introduce any user-facing change?
<!--
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If no, write 'No'.
-->
no

### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
-->
existing tests

test UI manually:
![image](https://user-images.githubusercontent.com/3182036/74357390-cfb30480-4dfa-11ea-83f6-825d1b9379ca.png)

explain output
```
AdaptiveSparkPlan(isFinalPlan=true)
+- OverwriteByExpression org.apache.spark.sql.execution.datasources.noop.NoopTable$403a2ed5, [AlwaysTrue()], org.apache.spark.sql.util.CaseInsensitiveStringMap1f
   +- *(5) SortMergeJoin(skew=true) [key1#2L], [key2#6L], Inner
      :- *(3) Sort [key1#2L ASC NULLS FIRST], false, 0
      :  +- SkewJoinShuffleReader 2 skewed partitions with size(max=5 KB, min=5 KB, avg=5 KB)
      :     +- ShuffleQueryStage 0
      :        +- Exchange hashpartitioning(key1#2L, 200), true, [id=#53]
      :           +- *(1) Project [(id#0L % 2) AS key1#2L]
      :              +- *(1) Filter isnotnull((id#0L % 2))
      :                 +- *(1) Range (0, 100000, step=1, splits=6)
      +- *(4) Sort [key2#6L ASC NULLS FIRST], false, 0
         +- SkewJoinShuffleReader 2 skewed partitions with size(max=5 KB, min=5 KB, avg=5 KB)
            +- ShuffleQueryStage 1
               +- Exchange hashpartitioning(key2#6L, 200), true, [id=#64]
                  +- *(2) Project [((id#4L % 2) + 1) AS key2#6L]
                     +- *(2) Filter isnotnull(((id#4L % 2) + 1))
                        +- *(2) Range (0, 100000, step=1, splits=6)
```

Closes #27493 from cloud-fan/aqe.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: herman <herman@databricks.com>
(cherry picked from commit a4ceea6)
Signed-off-by: herman <herman@databricks.com>
sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
…in optimizations

<!--
Thanks for sending a pull request!  Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
  2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
  4. Be sure to keep the PR description updated to reflect all changes.
  5. Please write your PR title to summarize what this PR proposes.
  6. If possible, provide a concise example to reproduce the issue for a faster review.
-->

### What changes were proposed in this pull request?
<!--
Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
  1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
  2. If you fix some SQL features, you can provide some references of other DBMSes.
  3. If there is design documentation, please add the link.
  4. If there is a discussion in the mailing list, please add the link.
-->
This is a followup of apache#26434

This PR use one special shuffle reader for skew join, so that we only have one join after optimization. In order to do that, this PR
1. add a very general `CustomShuffledRowRDD` which support all kind of partition arrangement.
2. move the logic of coalescing shuffle partitions to a util function, and call it during skew join optimization, to totally decouple with the `ReduceNumShufflePartitions` rule. It's too complicated to interfere skew join with `ReduceNumShufflePartitions`, as you need to consider the size of split partitions which don't respect target size already.

### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
  1. If you propose a new API, clarify the use case for a new API.
  2. If you fix a bug, you can clarify why it is a bug.
-->
The current skew join optimization has a serious performance issue: the size of the query plan depends on the number and size of skewed partitions.

### Does this PR introduce any user-facing change?
<!--
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If no, write 'No'.
-->
no

### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
-->
existing tests

test UI manually:
![image](https://user-images.githubusercontent.com/3182036/74357390-cfb30480-4dfa-11ea-83f6-825d1b9379ca.png)

explain output
```
AdaptiveSparkPlan(isFinalPlan=true)
+- OverwriteByExpression org.apache.spark.sql.execution.datasources.noop.NoopTable$403a2ed5, [AlwaysTrue()], org.apache.spark.sql.util.CaseInsensitiveStringMap1f
   +- *(5) SortMergeJoin(skew=true) [key1#2L], [key2#6L], Inner
      :- *(3) Sort [key1#2L ASC NULLS FIRST], false, 0
      :  +- SkewJoinShuffleReader 2 skewed partitions with size(max=5 KB, min=5 KB, avg=5 KB)
      :     +- ShuffleQueryStage 0
      :        +- Exchange hashpartitioning(key1#2L, 200), true, [id=apache#53]
      :           +- *(1) Project [(id#0L % 2) AS key1#2L]
      :              +- *(1) Filter isnotnull((id#0L % 2))
      :                 +- *(1) Range (0, 100000, step=1, splits=6)
      +- *(4) Sort [key2#6L ASC NULLS FIRST], false, 0
         +- SkewJoinShuffleReader 2 skewed partitions with size(max=5 KB, min=5 KB, avg=5 KB)
            +- ShuffleQueryStage 1
               +- Exchange hashpartitioning(key2#6L, 200), true, [id=apache#64]
                  +- *(2) Project [((id#4L % 2) + 1) AS key2#6L]
                     +- *(2) Filter isnotnull(((id#4L % 2) + 1))
                        +- *(2) Range (0, 100000, step=1, splits=6)
```

Closes apache#27493 from cloud-fan/aqe.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: herman <herman@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants