Skip to content

Commit 9b12571

Browse files
wangyumcloud-fan
authored andcommitted
[SPARK-36183][SQL][FOLLOWUP] Fix push down limit 1 through Aggregate
### What changes were proposed in this pull request? Use `Aggregate.aggregateExpressions` instead of `Aggregate.output` when pushing down limit 1 through Aggregate. For example: ```scala spark.range(10).selectExpr("id % 5 AS a", "id % 5 AS b").write.saveAsTable("t1") spark.sql("SELECT a, b, a AS alias FROM t1 GROUP BY a, b LIMIT 1").explain(true) ``` Before this pr: ``` == Optimized Logical Plan == GlobalLimit 1 +- LocalLimit 1 +- !Project [a#227L, b#228L, alias#226L] +- LocalLimit 1 +- Relation default.t1[a#227L,b#228L] parquet ``` After this pr: ``` == Optimized Logical Plan == GlobalLimit 1 +- LocalLimit 1 +- Project [a#227L, b#228L, a#227L AS alias#226L] +- LocalLimit 1 +- Relation default.t1[a#227L,b#228L] parquet ``` ### Why are the changes needed? Fix bug. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #35286 from wangyum/SPARK-36183-2. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent e362ef1 commit 9b12571

File tree

2 files changed

+9
-2
lines changed

2 files changed

+9
-2
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -721,9 +721,9 @@ object LimitPushDown extends Rule[LogicalPlan] {
721721
LocalLimit(exp, project.copy(child = pushLocalLimitThroughJoin(exp, join)))
722722
// Push down limit 1 through Aggregate and turn Aggregate into Project if it is group only.
723723
case Limit(le @ IntegerLiteral(1), a: Aggregate) if a.groupOnly =>
724-
Limit(le, Project(a.output, LocalLimit(le, a.child)))
724+
Limit(le, Project(a.aggregateExpressions, LocalLimit(le, a.child)))
725725
case Limit(le @ IntegerLiteral(1), p @ Project(_, a: Aggregate)) if a.groupOnly =>
726-
Limit(le, p.copy(child = Project(a.output, LocalLimit(le, a.child))))
726+
Limit(le, p.copy(child = Project(a.aggregateExpressions, LocalLimit(le, a.child))))
727727
}
728728
}
729729

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,13 @@ class LimitPushdownSuite extends PlanTest {
254254
Optimize.execute(x.union(y).groupBy("x.a".attr)("x.a".attr).limit(1).analyze),
255255
LocalLimit(1, LocalLimit(1, x).union(LocalLimit(1, y))).select("x.a".attr).limit(1).analyze)
256256

257+
comparePlans(
258+
Optimize.execute(
259+
x.groupBy("x.a".attr)("x.a".attr)
260+
.select("x.a".attr.as("a1"), "x.a".attr.as("a2")).limit(1).analyze),
261+
LocalLimit(1, x).select("x.a".attr)
262+
.select("x.a".attr.as("a1"), "x.a".attr.as("a2")).limit(1).analyze)
263+
257264
// No push down
258265
comparePlans(
259266
Optimize.execute(x.groupBy("x.a".attr)("x.a".attr).limit(2).analyze),

0 commit comments

Comments
 (0)