Skip to content

Commit a2de20c

Browse files
jackylee-chstczwd
authored andcommitted
[SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by
### Why are the changes needed? `EnsureRequirements` adds `ShuffleExchangeExec` (RangePartitioning) after Sort if `RoundRobinPartitioning` behinds it. This will cause 2 shuffles, and the number of partitions in the final stage is not the number specified by `RoundRobinPartitioning. **Example SQL** ``` SELECT /*+ REPARTITION(5) */ * FROM test ORDER BY a ``` **BEFORE** ``` == Physical Plan == *(1) Sort [a#0 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(a#0 ASC NULLS FIRST, 200), true, [id=#11] +- Exchange RoundRobinPartitioning(5), false, [id=#9] +- Scan hive default.test [a#0, b#1], HiveTableRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#0, b#1] ``` **AFTER** ``` == Physical Plan == *(1) Sort [a#0 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(a#0 ASC NULLS FIRST, 5), true, [id=#11] +- Scan hive default.test [a#0, b#1], HiveTableRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#0, b#1] ``` ### Does this PR introduce any user-facing change? No ### How was this patch tested? Run suite Tests and add new test for this. Closes #26946 from stczwd/RoundRobinPartitioning. Lead-authored-by: lijunqing <lijunqing@baidu.com> Co-authored-by: stczwd <qcsd2011@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 8d3eed3 commit a2de20c

File tree

3 files changed

+51
-5
lines changed

3 files changed

+51
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
5555
child
5656
case (child, BroadcastDistribution(mode)) =>
5757
BroadcastExchangeExec(mode, child)
58+
case (ShuffleExchangeExec(partitioning, child, _), distribution: OrderedDistribution) =>
59+
ShuffleExchangeExec(distribution.createPartitioning(partitioning.numPartitions), child)
5860
case (child, distribution) =>
5961
val numPartitions = distribution.requiredNumPartitions
6062
.getOrElse(defaultNumPreShufflePartitions)

sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,7 @@ class ConfigBehaviorSuite extends QueryTest with SharedSparkSession {
3939
def computeChiSquareTest(): Double = {
4040
val n = 10000
4141
// Trigger a sort
42-
// Range has range partitioning in its output now. To have a range shuffle, we
43-
// need to run a repartition first.
44-
val data = spark.range(0, n, 1, 1).repartition(10).sort($"id".desc)
42+
val data = spark.range(0, n, 1, 10).sort($"id".desc)
4543
.selectExpr("SPARK_PARTITION_ID() pid", "id").as[(Int, Long)].collect()
4644

4745
// Compute histogram for the number of records per partition post sort
@@ -55,12 +53,12 @@ class ConfigBehaviorSuite extends QueryTest with SharedSparkSession {
5553

5654
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> numPartitions.toString) {
5755
// The default chi-sq value should be low
58-
assert(computeChiSquareTest() < 100)
56+
assert(computeChiSquareTest() < 10)
5957

6058
withSQLConf(SQLConf.RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION.key -> "1") {
6159
// If we only sample one point, the range boundaries will be pretty bad and the
6260
// chi-sq value would be very high.
63-
assert(computeChiSquareTest() > 300)
61+
assert(computeChiSquareTest() > 100)
6462
}
6563
}
6664
}

sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,52 @@ class PlannerSuite extends SharedSparkSession {
421421
}
422422
}
423423

424+
test("SPARK-30036: Remove unnecessary RoundRobinPartitioning " +
425+
"if SortExec is followed by RoundRobinPartitioning") {
426+
val distribution = OrderedDistribution(SortOrder(Literal(1), Ascending) :: Nil)
427+
val partitioning = RoundRobinPartitioning(5)
428+
assert(!partitioning.satisfies(distribution))
429+
430+
val inputPlan = SortExec(SortOrder(Literal(1), Ascending) :: Nil,
431+
global = true,
432+
child = ShuffleExchangeExec(
433+
partitioning,
434+
DummySparkPlan(outputPartitioning = partitioning)))
435+
val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
436+
assert(outputPlan.find {
437+
case ShuffleExchangeExec(_: RoundRobinPartitioning, _, _) => true
438+
case _ => false
439+
}.isEmpty,
440+
"RoundRobinPartitioning should be changed to RangePartitioning")
441+
442+
val query = testData.select('key, 'value).repartition(2).sort('key.asc)
443+
assert(query.rdd.getNumPartitions == 2)
444+
assert(query.rdd.collectPartitions()(0).map(_.get(0)).toSeq == (1 to 50))
445+
}
446+
447+
test("SPARK-30036: Remove unnecessary HashPartitioning " +
448+
"if SortExec is followed by HashPartitioning") {
449+
val distribution = OrderedDistribution(SortOrder(Literal(1), Ascending) :: Nil)
450+
val partitioning = HashPartitioning(Literal(1) :: Nil, 5)
451+
assert(!partitioning.satisfies(distribution))
452+
453+
val inputPlan = SortExec(SortOrder(Literal(1), Ascending) :: Nil,
454+
global = true,
455+
child = ShuffleExchangeExec(
456+
partitioning,
457+
DummySparkPlan(outputPartitioning = partitioning)))
458+
val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
459+
assert(outputPlan.find {
460+
case ShuffleExchangeExec(_: HashPartitioning, _, _) => true
461+
case _ => false
462+
}.isEmpty,
463+
"HashPartitioning should be changed to RangePartitioning")
464+
465+
val query = testData.select('key, 'value).repartition(5, 'key).sort('key.asc)
466+
assert(query.rdd.getNumPartitions == 5)
467+
assert(query.rdd.collectPartitions()(0).map(_.get(0)).toSeq == (1 to 20))
468+
}
469+
424470
test("EnsureRequirements does not eliminate Exchange with different partitioning") {
425471
val distribution = ClusteredDistribution(Literal(1) :: Nil)
426472
val partitioning = HashPartitioning(Literal(2) :: Nil, 5)

0 commit comments

Comments
 (0)