Skip to content

Commit c122cca

Browse files
committed
Address comments, add tests
1 parent b2e8ef3 commit c122cca

File tree

6 files changed

+78
-37
lines changed

6 files changed

+78
-37
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,12 @@ class JoinedRow extends Row {
156156

157157
/**
158158
* JIT HACK: Replace with macros
159+
* The `JoinedRow` class is used in many performance critical situation. Unfortunately, since there
160+
* are multiple different types of `Rows` that could be stored as `row1` and `row2` most of the
161+
* calls in the critical path are polymorphic. By creating special versions of this class that are
162+
* used in only a single location of the code, we increase the chance that only a single type of
163+
* Row will be referenced, increasing the opportunity for the JIT to play tricks. This sounds
164+
* crazy but in benchmarks it had noticeable effects.
159165
*/
160166
class JoinedRow2 extends Row {
161167
private[this] var row1: Row = _

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificRow.scala

Lines changed: 40 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,42 +20,43 @@ package org.apache.spark.sql.catalyst.expressions
2020
import org.apache.spark.sql.catalyst.types._
2121

2222
/**
23+
* A parent class for mutable container objects that are reused when the values are changed,
24+
* resulting in less garbage. These values are held by a [[SpecificMutableRow]].
2325
*
26+
* The following code was roughly used to generate these objects:
27+
* {{{
28+
* val types = "Int,Float,Boolean,Double,Short,Long,Byte,Any".split(",")
29+
* types.map {tpe =>
30+
* s"""
31+
* final class Mutable$tpe extends MutableValue {
32+
* var value: $tpe = 0
33+
* def boxed = if (isNull) null else value
34+
* def update(v: Any) = value = {
35+
* isNull = false
36+
* v.asInstanceOf[$tpe]
37+
* }
38+
* def copy() = {
39+
* val newCopy = new Mutable$tpe
40+
* newCopy.isNull = isNull
41+
* newCopy.value = value
42+
* newCopy.asInstanceOf[this.type]
43+
* }
44+
* }"""
45+
* }.foreach(println)
2446
*
47+
* types.map { tpe =>
48+
* s"""
49+
* override def set$tpe(ordinal: Int, value: $tpe): Unit = {
50+
* val currentValue = values(ordinal).asInstanceOf[Mutable$tpe]
51+
* currentValue.isNull = false
52+
* currentValue.value = value
53+
* }
2554
*
26-
{{{
27-
val types = "Int,Float,Boolean,Double,Short,Long,Byte,Any".split(",")
28-
types.map {tpe =>
29-
s"""
30-
final class Mutable$tpe extends MutableValue {
31-
var value: $tpe = 0
32-
def boxed = if (isNull) null else value
33-
def update(v: Any) = value = {
34-
isNull = false
35-
v.asInstanceOf[$tpe]
36-
}
37-
def copy() = {
38-
val newCopy = new Mutable$tpe
39-
newCopy.isNull = isNull
40-
newCopy.value = value
41-
newCopy.asInstanceOf[this.type]
42-
}
43-
}"""
44-
}.foreach(println)
45-
46-
types.map { tpe =>
47-
s"""
48-
override def set$tpe(ordinal: Int, value: $tpe): Unit = {
49-
val currentValue = values(ordinal).asInstanceOf[Mutable$tpe]
50-
currentValue.isNull = false
51-
currentValue.value = value
52-
}
53-
54-
override def get$tpe(i: Int): $tpe = {
55-
values(i).asInstanceOf[Mutable$tpe].value
56-
}"""
57-
}.foreach(println)
58-
}}}
55+
* override def get$tpe(i: Int): $tpe = {
56+
* values(i).asInstanceOf[Mutable$tpe].value
57+
* }"""
58+
* }.foreach(println)
59+
* }}}
5960
*/
6061
abstract class MutableValue extends Serializable {
6162
var isNull: Boolean = true
@@ -184,7 +185,12 @@ final class MutableAny extends MutableValue {
184185
}
185186
}
186187

187-
class SpecificMutableRow(val values: Array[MutableValue]) extends MutableRow {
188+
/**
189+
* A row type that holds an array specialized container objects, of type [[MutableValue]], chosen
190+
* based on the dataTypes of each column. The intent is to decrease garbage when modifying the
191+
* values of primitive columns.
192+
*/
193+
final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableRow {
188194

189195
def this(dataTypes: Seq[DataType]) =
190196
this(

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,25 @@ case class MaxOf(left: Expression, right: Expression) extends Expression {
9393

9494
override def children = left :: right :: Nil
9595

96-
override def references = (left.flatMap(_.references) ++ right.flatMap(_.references)).toSet
96+
override def references = left.references ++ right.references
9797

9898
override def dataType = left.dataType
9999

100100
override def eval(input: Row): Any = {
101101
val leftEval = left.eval(input)
102+
val rightEval = right.eval(input)
103+
if (leftEval == null) {
104+
rightEval
105+
} else if (rightEval == null) {
106+
leftEval
107+
} else {
108+
val numeric = left.dataType.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]]
109+
if (numeric.compare(leftEval, rightEval) < 0) {
110+
rightEval
111+
} else {
112+
leftEval
113+
}
114+
}
102115
}
103116

104117
override def toString = s"MaxOf($left, $right)"

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions
2626
import org.apache.spark.sql.catalyst.expressions._
2727
import org.apache.spark.sql.catalyst.types._
2828

29+
// These classes are here to avoid issues with serialization and integration with quasiquotes.
2930
class IntegerHashSet extends org.apache.spark.util.collection.OpenHashSet[Int]
3031
class LongHashSet extends org.apache.spark.util.collection.OpenHashSet[Long]
3132

@@ -53,6 +54,11 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
5354
private val curId = new java.util.concurrent.atomic.AtomicInteger()
5455
private val javaSeparator = "$"
5556

57+
/**
58+
* Can be flipped on manually in the console to add (expensive) expression evaluation trace code.
59+
*/
60+
var debugLogging = false
61+
5662
/**
5763
* Generates a class for a given input expression. Called when there is not cached code
5864
* already available.
@@ -496,7 +502,7 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
496502

497503
// Only inject debugging code if debugging is turned on.
498504
val debugCode =
499-
if (false) {
505+
if (debugLogging) {
500506
val localLogger = log
501507
val localLoggerTree = reify { localLogger }
502508
q"""

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,16 @@ class ExpressionEvaluationSuite extends FunSuite {
136136
checkEvaluation(In(Literal(1), Seq(Literal(1), Literal(2))) && In(Literal(2), Seq(Literal(1), Literal(2))), true)
137137
}
138138

139+
test("MaxOf") {
140+
checkEvaluation(MaxOf(1, 2), 2)
141+
checkEvaluation(MaxOf(2, 1), 2)
142+
checkEvaluation(MaxOf(1L, 2L), 2L)
143+
checkEvaluation(MaxOf(2L, 1L), 2L)
144+
145+
checkEvaluation(MaxOf(Literal(null, IntegerType), 2), 2)
146+
checkEvaluation(MaxOf(2, Literal(null, IntegerType)), 2)
147+
}
148+
139149
test("LIKE literal Regular Expression") {
140150
checkEvaluation(Literal(null, StringType).like("a"), null)
141151
checkEvaluation(Literal("a", StringType).like(Literal(null, StringType)), null)

sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
158158
case r if ct.runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes
159159
}
160160
assert(sizes.size === 2 && sizes(0) <= autoBroadcastJoinThreshold,
161-
s"query should contain two relations, each of which has size smaller than autoConvertSize instead ${rdd.queryExecution}")
161+
s"query should contain two relations, each of which has size smaller than autoConvertSize")
162162

163163
// Using `sparkPlan` because for relevant patterns in HashJoin to be
164164
// matched, other strategies need to be applied.

0 commit comments

Comments
 (0)