Skip to content

Commit bca89e2

Browse files
wangyumGitHub Enterprise
authored and
GitHub Enterprise
committed
[SPARK-36183][SQL][FOLLOWUP] Fix push down limit 1 through Aggregate (#861)
* [SPARK-36183][SQL][FOLLOWUP] Fix push down limit 1 through Aggregate ### What changes were proposed in this pull request? Use `Aggregate.aggregateExpressions` instead of `Aggregate.output` when pushing down limit 1 through Aggregate. For example: ```scala spark.range(10).selectExpr("id % 5 AS a", "id % 5 AS b").write.saveAsTable("t1") spark.sql("SELECT a, b, a AS alias FROM t1 GROUP BY a, b LIMIT 1").explain(true) ``` Before this pr: ``` == Optimized Logical Plan == GlobalLimit 1 +- LocalLimit 1 +- !Project [a#227L, b#228L, alias#226L] +- LocalLimit 1 +- Relation default.t1[a#227L,b#228L] parquet ``` After this pr: ``` == Optimized Logical Plan == GlobalLimit 1 +- LocalLimit 1 +- Project [a#227L, b#228L, a#227L AS alias#226L] +- LocalLimit 1 +- Relation default.t1[a#227L,b#228L] parquet ``` ### Why are the changes needed? Fix bug. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #35286 from wangyum/SPARK-36183-2. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 9b12571)
1 parent cdf792c commit bca89e2

File tree

2 files changed

+15
-2
lines changed

2 files changed

+15
-2
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -548,9 +548,9 @@ object LimitPushDown extends Rule[LogicalPlan] {
548548

549549
// Push down limit 1 through Aggregate and turn Aggregate into Project if it is group only.
550550
case Limit(le @ IntegerLiteral(1), a: Aggregate) if a.groupOnly =>
551-
Limit(le, Project(a.output, LocalLimit(le, a.child)))
551+
Limit(le, Project(a.aggregateExpressions, LocalLimit(le, a.child)))
552552
case Limit(le @ IntegerLiteral(1), p @ Project(_, a: Aggregate)) if a.groupOnly =>
553-
Limit(le, p.copy(child = Project(a.output, LocalLimit(le, a.child))))
553+
Limit(le, p.copy(child = Project(a.aggregateExpressions, LocalLimit(le, a.child))))
554554
}
555555
}
556556

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,19 @@ class LimitPushdownSuite extends PlanTest {
182182
Optimize.execute(x.groupBy("x.a".attr)("x.a".attr).select("x.a".attr).limit(1).analyze),
183183
LocalLimit(1, x).select("x.a".attr).select("x.a".attr).limit(1).analyze)
184184

185+
comparePlans(
186+
Optimize.execute(x.union(y).groupBy("x.a".attr)("x.a".attr).limit(1).analyze),
187+
LocalLimit(1, LocalLimit(1, x).union(
188+
LocalLimit(1, y.select('a.as("a"), 'b.as("b"), 'c.as("c"))))).select("x.a".attr)
189+
.limit(1).analyze)
190+
191+
comparePlans(
192+
Optimize.execute(
193+
x.groupBy("x.a".attr)("x.a".attr)
194+
.select("x.a".attr.as("a1"), "x.a".attr.as("a2")).limit(1).analyze),
195+
LocalLimit(1, x).select("x.a".attr)
196+
.select("x.a".attr.as("a1"), "x.a".attr.as("a2")).limit(1).analyze)
197+
185198
// No push down
186199
comparePlans(
187200
Optimize.execute(x.groupBy("x.a".attr)("x.a".attr).limit(2).analyze),

0 commit comments

Comments
 (0)