Skip to content

Commit 38c7449

Browse files
committed
comments and style
1 parent 9153652 commit 38c7449

File tree

5 files changed

+28
-19
lines changed

5 files changed

+28
-19
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -203,15 +203,6 @@ case class CollectHashSetFunction(
203203
@transient
204204
val distinctValue = new InterpretedProjection(expr)
205205

206-
/*
207-
override def merge(other: MergableAggregateFunction): MergableAggregateFunction = {
208-
val otherSetIterator = other.asInstanceOf[CountDistinctFunction].seen.iterator
209-
while(otherSetIterator.hasNext) {
210-
seen.add(otherSetIterator.next())
211-
}
212-
this
213-
}*/
214-
215206
override def update(input: Row): Unit = {
216207
val evaluatedExpr = distinctValue(input)
217208
if (!evaluatedExpr.anyNull) {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,9 @@ case class MaxOf(left: Expression, right: Expression) extends Expression {
9797

9898
override def dataType = left.dataType
9999

100-
override def eval(input: Row): Any = ???
100+
override def eval(input: Row): Any = {
101+
val leftEval = left.eval(input)
102+
}
101103

102104
override def toString = s"MaxOf($left, $right)"
103105
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/sets.scala

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,18 @@ package org.apache.spark.sql.catalyst.expressions
2020
import org.apache.spark.sql.catalyst.types._
2121
import org.apache.spark.util.collection.OpenHashSet
2222

23+
/**
24+
* Creates a new set of the specified type
25+
*/
2326
case class NewSet(elementType: DataType) extends LeafExpression {
2427
type EvaluatedType = Any
2528

2629
def references = Set.empty
2730

2831
def nullable = false
2932

30-
// This is not completely accurate..
33+
// We are currently only using these Expressions internally for aggregation. However, if we ever
34+
// expose these to users we'll want to create a proper type instead of hijacking ArrayType.
3135
def dataType = ArrayType(elementType)
3236

3337
def eval(input: Row): Any = {
@@ -37,7 +41,10 @@ case class NewSet(elementType: DataType) extends LeafExpression {
3741
override def toString = s"new Set($dataType)"
3842
}
3943

40-
// THIS MUTATES ITS ARUGMENTS
44+
/**
45+
* Adds an item to a set.
46+
* For performance, this expression mutates its input during evaluation.
47+
*/
4148
case class AddItemToSet(item: Expression, set: Expression) extends Expression {
4249
type EvaluatedType = Any
4350

@@ -67,7 +74,10 @@ case class AddItemToSet(item: Expression, set: Expression) extends Expression {
6774
override def toString = s"$set += $item"
6875
}
6976

70-
// THIS MUTATES ITS ARUGMENTS
77+
/**
78+
* Combines the elements of two sets.
79+
* For performance, this expression mutates its left input set during evaluation.
80+
*/
7181
case class CombineSets(left: Expression, right: Expression) extends BinaryExpression {
7282
type EvaluatedType = Any
7383

@@ -97,6 +107,9 @@ case class CombineSets(left: Expression, right: Expression) extends BinaryExpres
97107
}
98108
}
99109

110+
/**
111+
* Returns the number of elements in the input set.
112+
*/
100113
case class CountSet(child: Expression) extends UnaryExpression {
101114
type EvaluatedType = Any
102115

@@ -112,4 +125,4 @@ case class CountSet(child: Expression) extends UnaryExpression {
112125
}
113126

114127
override def toString = s"$child.count()"
115-
}
128+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ case class GeneratedAggregate(
108108
val currentMax = AttributeReference("currentMax", expr.dataType, nullable = true)()
109109
val initialValue = Literal(null, expr.dataType)
110110
val updateMax = MaxOf(currentMax, expr)
111-
//If(IsNull(currentMax), expr, If(GreaterThan(currentMax, expr), currentMax, expr))
112111

113112
AggregateEvaluation(
114113
currentMax :: Nil,
@@ -128,8 +127,9 @@ case class GeneratedAggregate(
128127
set)
129128

130129
case CombineSetsAndCount(inputSet) =>
130+
val ArrayType(inputType) = inputSet.dataType
131131
val set = AttributeReference("hashSet", inputSet.dataType, nullable = false)()
132-
val initialValue = NewSet(IntegerType) // NOT TRUE
132+
val initialValue = NewSet(inputType)
133133
val collectSets = CombineSets(set, inputSet)
134134

135135
AggregateEvaluation(

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,13 @@ private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(co
4545
kryo.register(classOf[com.clearspring.analytics.stream.cardinality.HyperLogLog],
4646
new HyperLogLogSerializer)
4747
kryo.register(classOf[scala.math.BigDecimal], new BigDecimalSerializer)
48-
// Specific hashset must come first
48+
49+
// Specific hashsets must come first
4950
kryo.register(classOf[IntegerHashSet], new IntegerHashSetSerializer)
5051
kryo.register(classOf[LongHashSet], new LongHashSetSerializer)
51-
kryo.register(classOf[org.apache.spark.util.collection.OpenHashSet[_]], new OpenHashSetSerializer)
52+
kryo.register(classOf[org.apache.spark.util.collection.OpenHashSet[_]],
53+
new OpenHashSetSerializer)
54+
5255
kryo.setReferences(false)
5356
kryo.setClassLoader(Utils.getSparkClassLoader)
5457
new AllScalaRegistrar().apply(kryo)
@@ -188,4 +191,4 @@ private[sql] class LongHashSetSerializer extends Serializer[LongHashSet] {
188191
}
189192
set
190193
}
191-
}
194+
}

0 commit comments

Comments
 (0)