Skip to content

Commit cdc7dbe

Browse files
update
1 parent 290cb4d commit cdc7dbe

File tree

3 files changed

+27
-15
lines changed

3 files changed

+27
-15
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1033,7 +1033,7 @@ object CombineFilters extends Rule[LogicalPlan] with PredicateHelper {
10331033
object EliminateSorts extends Rule[LogicalPlan] {
10341034
def apply(plan: LogicalPlan): LogicalPlan = plan transform applyLocally
10351035

1036-
val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
1036+
private val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
10371037
case s @ Sort(orders, _, child) if orders.isEmpty || orders.exists(_.child.foldable) =>
10381038
val newOrders = orders.filterNot(_.child.foldable)
10391039
if (newOrders.isEmpty) {

sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantSorts.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ import org.apache.spark.sql.internal.SQLConf
2323

2424
/**
2525
* Remove redundant SortExec node from the spark plan. A sort node is redundant when
26-
* its child satisfies both its sort orders and its required child distribution.
26+
* its child satisfies both its sort orders and its required child distribution. Note
27+
* this rule differs from the Optimizer rule EliminateSorts in that this rule also checks
28+
* if the child satisfies the required distribution so that it is safe to remove not only a
29+
* local sort but also a global sort when its child already satisfies required sort orders.
2730
*/
2831
object RemoveRedundantSorts extends Rule[SparkPlan] {
2932
def apply(plan: SparkPlan): SparkPlan = {

sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ abstract class RemoveRedundantSortsSuiteBase
6464
withTempView("t1", "t2") {
6565
spark.range(1000).select('id as "key").createOrReplaceTempView("t1")
6666
spark.range(1000).select('id as "key").createOrReplaceTempView("t2")
67-
67+
6868
val queryTemplate = """
6969
|SELECT /*+ BROADCAST(%s) */ t1.key FROM
7070
| (SELECT key FROM t1 WHERE key > 10 ORDER BY key DESC LIMIT 10) t1
@@ -74,17 +74,25 @@ abstract class RemoveRedundantSortsSuiteBase
7474
|ORDER BY %s
7575
""".stripMargin
7676

77-
val innerJoinAsc = queryTemplate.format("t1", "t2.key ASC")
78-
checkSorts(innerJoinAsc, 1, 1)
79-
80-
val innerJoinDesc = queryTemplate.format("t1", "t2.key DESC")
81-
checkSorts(innerJoinDesc, 0, 1)
82-
83-
val innerJoinDesc1 = queryTemplate.format("t1", "t1.key DESC")
84-
checkSorts(innerJoinDesc1, 1, 1)
85-
86-
val leftOuterJoinDesc = queryTemplate.format("t2", "t1.key DESC")
87-
checkSorts(leftOuterJoinDesc, 0, 1)
77+
// No sort should be removed since the stream side (t2) order DESC
78+
// does not satisfy the required sort order ASC.
79+
val buildLeftOrderByRightAsc = queryTemplate.format("t1", "t2.key ASC")
80+
checkSorts(buildLeftOrderByRightAsc, 1, 1)
81+
82+
// The top sort node should be removed since the stream side (t2) order DESC already
83+
// satisfies the required sort order DESC.
84+
val buildLeftOrderByRightDesc = queryTemplate.format("t1", "t2.key DESC")
85+
checkSorts(buildLeftOrderByRightDesc, 0, 1)
86+
87+
// No sort should be removed since the sort ordering from broadcast-hash join is based
88+
// on the stream side (t2) and the required sort order is from t1.
89+
val buildLeftOrderByLeftDesc = queryTemplate.format("t1", "t1.key DESC")
90+
checkSorts(buildLeftOrderByLeftDesc, 1, 1)
91+
92+
// The top sort node should be removed since the stream side (t1) order DESC already
93+
// satisfies the required sort order DESC.
94+
val buildRightOrderByLeftDesc = queryTemplate.format("t2", "t1.key DESC")
95+
checkSorts(buildRightOrderByLeftDesc, 0, 1)
8896
}
8997
}
9098

@@ -104,7 +112,8 @@ abstract class RemoveRedundantSortsSuiteBase
104112
val queryAsc = query + " ASC"
105113
checkSorts(queryAsc, 2, 3)
106114

107-
// Top level sort should only be eliminated if it's order is descending with SMJ.
115+
// The top level sort should not be removed since the child output ordering is ASC and
116+
// the required ordering is DESC.
108117
val queryDesc = query + " DESC"
109118
checkSorts(queryDesc, 3, 3)
110119
}

0 commit comments

Comments
 (0)