Skip to content

Commit 2cbb3e4

Browse files
adrian-wangmarmbrus
authored andcommitted
[SPARK-5642] [SQL] Apply column pruning on unused aggregation fields
select k from (select key k, max(value) v from src group by k) t Author: Daoyuan Wang <daoyuan.wang@intel.com> Author: Michael Armbrust <michael@databricks.com> Closes #4415 from adrian-wang/groupprune and squashes the following commits: 5d2d8a3 [Daoyuan Wang] address Michael's comments 61f8ef7 [Daoyuan Wang] add a unit test 80ddcc6 [Daoyuan Wang] keep project b69d385 [Daoyuan Wang] add a prune rule for grouping set
1 parent 5d3cc6b commit 2cbb3e4

File tree

2 files changed

+44
-2
lines changed

2 files changed

+44
-2
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,15 @@ object ColumnPruning extends Rule[LogicalPlan] {
119119
case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty =>
120120
a.copy(child = Project(a.references.toSeq, child))
121121

122+
case p @ Project(projectList, a @ Aggregate(groupingExpressions, aggregateExpressions, child))
123+
if (a.outputSet -- p.references).nonEmpty =>
124+
Project(
125+
projectList,
126+
Aggregate(
127+
groupingExpressions,
128+
aggregateExpressions.filter(e => p.references.contains(e)),
129+
child))
130+
122131
// Eliminate unneeded attributes from either side of a Join.
123132
case Project(projectList, Join(left, right, joinType, condition)) =>
124133
// Collect the list of all references required either above or to evaluate the condition.

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
1919

2020
import org.apache.spark.sql.catalyst.analysis
2121
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
22-
import org.apache.spark.sql.catalyst.expressions.Explode
22+
import org.apache.spark.sql.catalyst.expressions.{Count, Explode}
2323
import org.apache.spark.sql.catalyst.plans.logical._
2424
import org.apache.spark.sql.catalyst.plans.{PlanTest, LeftOuter, RightOuter}
2525
import org.apache.spark.sql.catalyst.rules._
@@ -37,7 +37,8 @@ class FilterPushdownSuite extends PlanTest {
3737
CombineFilters,
3838
PushPredicateThroughProject,
3939
PushPredicateThroughJoin,
40-
PushPredicateThroughGenerate) :: Nil
40+
PushPredicateThroughGenerate,
41+
ColumnPruning) :: Nil
4142
}
4243

4344
val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
@@ -58,6 +59,38 @@ class FilterPushdownSuite extends PlanTest {
5859
comparePlans(optimized, correctAnswer)
5960
}
6061

62+
test("column pruning for group") {
63+
val originalQuery =
64+
testRelation
65+
.groupBy('a)('a, Count('b))
66+
.select('a)
67+
68+
val optimized = Optimize(originalQuery.analyze)
69+
val correctAnswer =
70+
testRelation
71+
.select('a)
72+
.groupBy('a)('a)
73+
.select('a).analyze
74+
75+
comparePlans(optimized, correctAnswer)
76+
}
77+
78+
test("column pruning for group with alias") {
79+
val originalQuery =
80+
testRelation
81+
.groupBy('a)('a as 'c, Count('b))
82+
.select('c)
83+
84+
val optimized = Optimize(originalQuery.analyze)
85+
val correctAnswer =
86+
testRelation
87+
.select('a)
88+
.groupBy('a)('a as 'c)
89+
.select('c).analyze
90+
91+
comparePlans(optimized, correctAnswer)
92+
}
93+
6194
// After this line is unimplemented.
6295
test("simple push down") {
6396
val originalQuery =

0 commit comments

Comments
 (0)