@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, LogicalPl
23
23
import org .apache .spark .sql .catalyst .rules .Rule
24
24
import org .apache .spark .sql .types .IntegerType
25
25
26
- /*
26
+ /**
27
27
* This rule rewrites an aggregate query with distinct aggregations into an expanded double
28
28
* aggregation in which the regular aggregation expressions and every distinct clause is aggregated
29
29
* in a separate group. The results are then combined in a second aggregate.
@@ -125,6 +125,8 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
125
125
// we must expand at least one of the children (here we take the first child),
126
126
// or If we don't, we will get the wrong result, for example:
127
127
// count(distinct 1) will be explained to count(1) after the rewrite function.
128
+ // Generally, the distinct aggregateFunction should not run
129
+ // foldable TypeCheck for the first child.
128
130
e.aggregateFunction.children.take(1 ).toSet
129
131
}
130
132
}
@@ -144,8 +146,9 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
144
146
145
147
// Functions used to modify aggregate functions and their inputs.
146
148
def evalWithinGroup (id : Literal , e : Expression ) = If (EqualTo (gid, id), e, nullify(e))
147
- def patchAggregateFunctionChildren (af : AggregateFunction )(
148
- attrs : Expression => Option [Expression ]): AggregateFunction = {
149
+ def patchAggregateFunctionChildren (
150
+ af : AggregateFunction )(
151
+ attrs : Expression => Option [Expression ]): AggregateFunction = {
149
152
val newChildren = af.children.map(c => attrs(c).getOrElse(c))
150
153
af.withNewChildren(newChildren).asInstanceOf [AggregateFunction ]
151
154
}
@@ -251,8 +254,8 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
251
254
252
255
// Construct the second aggregate
253
256
val transformations : Map [Expression , Expression ] =
254
- (distinctAggOperatorMap.flatMap(_._2) ++
255
- regularAggOperatorMap.map(e => (e._1, e._3))).toMap
257
+ (distinctAggOperatorMap.flatMap(_._2) ++
258
+ regularAggOperatorMap.map(e => (e._1, e._3))).toMap
256
259
257
260
val patchedAggExpressions = a.aggregateExpressions.map { e =>
258
261
e.transformDown {
@@ -275,9 +278,9 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
275
278
private def nullify (e : Expression ) = Literal .create(null , e.dataType)
276
279
277
280
private def expressionAttributePair (e : Expression ) =
278
- // We are creating a new reference here instead of reusing the attribute in case of a
279
- // NamedExpression. This is done to prevent collisions between distinct and regular aggregate
280
- // children, in this case attribute reuse causes the input of the regular aggregate to bound to
281
- // the (nulled out) input of the distinct aggregate.
281
+ // We are creating a new reference here instead of reusing the attribute in case of a
282
+ // NamedExpression. This is done to prevent collisions between distinct and regular aggregate
283
+ // children, in this case attribute reuse causes the input of the regular aggregate to bound to
284
+ // the (nulled out) input of the distinct aggregate.
282
285
e -> AttributeReference (e.sql, e.dataType, nullable = true )()
283
286
}
0 commit comments