File tree Expand file tree Collapse file tree 1 file changed +2
-10
lines changed
sql/core/src/main/scala/org/apache/spark/sql/execution Expand file tree Collapse file tree 1 file changed +2
-10
lines changed Original file line number Diff line number Diff line change @@ -144,20 +144,12 @@ case class Aggregate(
144144
145145 var i = 0
146146 while (i < buffer.length) {
147- aggregateResults(i) = buffer(i).apply (EmptyRow )
147+ aggregateResults(i) = buffer(i).eval (EmptyRow )
148148 i += 1
149149 }
150150
151151 Iterator (resultProjection(aggregateResults))
152152 }
153- buildRow(aggImplementations.map(_.eval(group)))
154- }
155-
156- // TODO: THIS BREAKS PIPELINING, DOUBLE COMPUTES THE ANSWER, AND USES TOO MUCH MEMORY...
157- if (groupingExpressions.isEmpty && result.count == 0 ) {
158- // When there there is no output to the Aggregate operator, we still output an empty row.
159- val aggImplementations = createAggregateImplementations()
160- sc.makeRDD(buildRow(aggImplementations.map(_.eval(null ))) :: Nil )
161153 } else {
162154 child.execute().mapPartitions { iter =>
163155 val hashTable = new HashMap [Row , Array [AggregateFunction ]]
@@ -198,7 +190,7 @@ case class Aggregate(
198190 while (i < currentBuffer.length) {
199191 // Evaluating an aggregate buffer returns the result. No row is required since we
200192 // already added all rows in the group using update.
201- aggregateResults(i) = currentBuffer(i).apply (EmptyRow )
193+ aggregateResults(i) = currentBuffer(i).eval (EmptyRow )
202194 i += 1
203195 }
204196 resultProjection(joinedRow(aggregateResults, currentGroup))
You can’t perform that action at this time.
0 commit comments