Skip to content

[SPARK-18016][SQL][CATALYST] Code Generation: Constant Pool Limit - Class Splitting #18075

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions sql/catalyst/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<configuration>
<argLine>-Xmx4g -Xss4096k -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m</argLine>
</configuration>
</plugin>
<plugin>
<groupId>org.antlr</groupId>
<artifactId>antlr4-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ case class ScalaUDF(
val converterTerm = ctx.freshName("converter")
val expressionIdx = ctx.references.size - 1
ctx.addMutableState(converterClassName, converterTerm,
s"this.$converterTerm = ($converterClassName)$typeConvertersClassName" +
s"$converterTerm = ($converterClassName)$typeConvertersClassName" +
s".createToScalaConverter(((${expressionClassName})((($scalaUDFClassName)" +
s"references[$expressionIdx]).getChildren().apply($index))).dataType());")
converterTerm
Expand All @@ -1005,7 +1005,7 @@ case class ScalaUDF(
// Generate codes used to convert the returned value of user-defined functions to Catalyst type
val catalystConverterTerm = ctx.freshName("catalystConverter")
ctx.addMutableState(converterClassName, catalystConverterTerm,
s"this.$catalystConverterTerm = ($converterClassName)$typeConvertersClassName" +
s"$catalystConverterTerm = ($converterClassName)$typeConvertersClassName" +
s".createToCatalystConverter($scalaUDF.dataType());")

val resultTerm = ctx.freshName("result")
Expand All @@ -1019,7 +1019,7 @@ case class ScalaUDF(

val funcTerm = ctx.freshName("udf")
ctx.addMutableState(funcClassName, funcTerm,
s"this.$funcTerm = ($funcClassName)$scalaUDF.userDefinedFunc();")
s"$funcTerm = ($funcClassName)$scalaUDF.userDefinedFunc();")

// codegen for children expressions
val evals = children.map(_.genCode(ctx))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import scala.util.control.NonFatal

import com.google.common.cache.{CacheBuilder, CacheLoader}
import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException}
import org.apache.commons.lang3.exception.ExceptionUtils
import org.codehaus.commons.compiler.CompileException
import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, JaninoRuntimeException, SimpleCompiler}
import org.codehaus.janino.util.ClassFile
Expand Down Expand Up @@ -113,7 +112,7 @@ class CodegenContext {
val idx = references.length
references += obj
val clsName = Option(className).getOrElse(obj.getClass.getName)
addMutableState(clsName, term, s"this.$term = ($clsName) references[$idx];")
addMutableState(clsName, term, s"$term = ($clsName) references[$idx];")
term
}

Expand Down Expand Up @@ -202,16 +201,6 @@ class CodegenContext {
partitionInitializationStatements.mkString("\n")
}

/**
* Holding all the functions those will be added into generated class.
*/
val addedFunctions: mutable.Map[String, String] =
mutable.Map.empty[String, String]

def addNewFunction(funcName: String, funcCode: String): Unit = {
addedFunctions += ((funcName, funcCode))
}

/**
* Holds expressions that are equivalent. Used to perform subexpression elimination
* during codegen.
Expand All @@ -233,10 +222,118 @@ class CodegenContext {
// The collection of sub-expression result resetting methods that need to be called on each row.
val subexprFunctions = mutable.ArrayBuffer.empty[String]

def declareAddedFunctions(): String = {
addedFunctions.map { case (funcName, funcCode) => funcCode }.mkString("\n")
val outerClassName = "OuterClass"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: private val


/**
* Holds the class and instance names to be generated, where `OuterClass` is a placeholder
* standing for whichever class is generated as the outermost class and which will contain any
* nested sub-classes. All other classes and instance names in this list will represent private,
* nested sub-classes.
*/
private val classes: mutable.ListBuffer[(String, String)] =
mutable.ListBuffer[(String, String)](outerClassName -> null)

// A map holding the current size in bytes of each class to be generated.
private val classSize: mutable.Map[String, Int] =
mutable.Map[String, Int](outerClassName -> 0)

// Nested maps holding function names and their code belonging to each class.
private val classFunctions: mutable.Map[String, mutable.Map[String, String]] =
mutable.Map(outerClassName -> mutable.Map.empty[String, String])

// Returns the size of the most recently added class.
private def currClassSize(): Int = classSize(classes.head._1)

// Returns the class name and instance name for the most recently added class.
private def currClass(): (String, String) = classes.head

// Adds a new class. Requires the class' name, and its instance name.
private def addClass(className: String, classInstance: String): Unit = {
classes.prepend(className -> classInstance)
classSize += className -> 0
classFunctions += className -> mutable.Map.empty[String, String]
}

/**
* Adds a function to the generated class. If the code for the `OuterClass` grows too large, the
* function will be inlined into a new private, nested class, and a class-qualified name for the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: class instance-qualified name

* function will be returned. Otherwise, the function will be inined to the `OuterClass` the
* simple `funcName` will be returned.
*
* @param funcName the class-unqualified name of the function
* @param funcCode the body of the function
* @param inlineToOuterClass whether the given code must be inlined to the `OuterClass`. This
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you give an example? I'm not very clear when we need this

Copy link
Author

@bdrillard bdrillard Jun 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, see the portion of doConsume in the Limit class where the stopEarly function is registered, https://github.com/apache/spark/pull/18075/files#diff-379cccace8699ca00b76ff5631222adeR73

In this section of code, the registration of the function is separate from the caller code, so unlike other changes in this patch, we have no way of informing the caller code what the potentially class-qualified name of the function would be if it were inlined to a nested class. Instead, the caller code for the function (in WholeStageCodegenExec), makes a hard assumption that stopEarly will be visible globally, that is, in the outer class. The caller is divorced from the function producer across classes, so it's not clear how to make a generated function name visible, but the hint to inline to just inline the function to the outer class fixes that issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me, as the stopEarly in Limit is going to override the stopEarly in BufferedRowIterator, we can only put it in outer class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, whole stage codegen is really tricky...

* can be necessary when a function is declared outside of the context
* it is eventually referenced and a returned qualified function name
* cannot otherwise be accessed.
* @return the name of the function, qualified by class if it will be inlined to a private,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

* nested sub-class
*/
def addNewFunction(
funcName: String,
funcCode: String,
inlineToOuterClass: Boolean = false): String = {
// The number of named constants that can exist in the class is limited by the Constant Pool
// limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a
// threshold of 1600k bytes to determine when a function should be inlined to a private, nested
// sub-class.
val (className, classInstance) = if (inlineToOuterClass) {
outerClassName -> ""
} else if (currClassSize > 1600000) {
val className = freshName("NestedClass")
val classInstance = freshName("nestedClassInstance")

addClass(className, classInstance)

className -> classInstance
} else {
currClass()
}

classSize(className) += funcCode.length
classFunctions(className) += funcName -> funcCode

if (className == outerClassName) {
funcName
} else {

s"$classInstance.$funcName"
}
}

/**
* Instantiates all nested, private sub-classes as objects to the `OuterClass`
*/
private[sql] def initNestedClasses(): String = {
// Nested, private sub-classes have no mutable state (though they do reference the outer class'
// mutable state), so we declare and initialize them inline to the OuterClass.
classes.filter(_._1 != outerClassName).map {
case (className, classInstance) =>
s"private $className $classInstance = new $className();"
}.mkString("\n")
}

/**
* Declares all function code that should be inlined to the `OuterClass`.
*/
private[sql] def declareAddedFunctions(): String = {
classFunctions(outerClassName).values.mkString("\n")
}

/**
* Declares all nested, private sub-classes and the function code that should be inlined to them.
*/
private[sql] def declareNestedClasses(): String = {
classFunctions.filterKeys(_ != outerClassName).map {
case (className, functions) =>
s"""
|private class $className {
| ${functions.values.mkString("\n")}
|}
""".stripMargin
}
}.mkString("\n")

final val JAVA_BOOLEAN = "boolean"
final val JAVA_BYTE = "byte"
final val JAVA_SHORT = "short"
Expand Down Expand Up @@ -556,8 +653,7 @@ class CodegenContext {
return 0;
}
"""
addNewFunction(compareFunc, funcCode)
s"this.$compareFunc($c1, $c2)"
s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
case schema: StructType =>
val comparisons = GenerateOrdering.genComparisons(this, schema)
val compareFunc = freshName("compareStruct")
Expand All @@ -573,8 +669,7 @@ class CodegenContext {
return 0;
}
"""
addNewFunction(compareFunc, funcCode)
s"this.$compareFunc($c1, $c2)"
s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
case other if other.isInstanceOf[AtomicType] => s"$c1.compare($c2)"
case udt: UserDefinedType[_] => genComp(udt.sqlType, c1, c2)
case _ =>
Expand Down Expand Up @@ -629,7 +724,9 @@ class CodegenContext {

/**
* Splits the generated code of expressions into multiple functions, because function has
* 64kb code size limit in JVM
* 64kb code size limit in JVM. If the class to which the function would be inlined would grow
* beyond 1600kb, we declare a private, nested sub-class, and the function is inlined to it
* instead, because classes have a constant pool limit of 65,536 named values.
*
* @param row the variable name of row that is used by expressions
* @param expressions the codes to evaluate expressions.
Expand Down Expand Up @@ -689,7 +786,6 @@ class CodegenContext {
|}
""".stripMargin
addNewFunction(name, code)
name
}

foldFunctions(functions.map(name => s"$name(${arguments.map(_._2).mkString(", ")})"))
Expand Down Expand Up @@ -773,8 +869,6 @@ class CodegenContext {
|}
""".stripMargin

addNewFunction(fnName, fn)

// Add a state and a mapping of the common subexpressions that are associate with this
// state. Adding this expression to subExprEliminationExprMap means it will call `fn`
// when it is code generated. This decision should be a cost based one.
Expand All @@ -792,7 +886,7 @@ class CodegenContext {
addMutableState(javaType(expr.dataType), value,
s"$value = ${defaultValue(expr.dataType)};")

subexprFunctions += s"$fnName($INPUT_ROW);"
subexprFunctions += s"${addNewFunction(fnName, fn)}($INPUT_ROW);"
val state = SubExprEliminationState(isNull, value)
e.foreach(subExprEliminationExprs.put(_, state))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,21 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
if (e.nullable) {
val isNull = s"isNull_$i"
val value = s"value_$i"
ctx.addMutableState("boolean", isNull, s"this.$isNull = true;")
ctx.addMutableState("boolean", isNull, s"$isNull = true;")
ctx.addMutableState(ctx.javaType(e.dataType), value,
s"this.$value = ${ctx.defaultValue(e.dataType)};")
s"$value = ${ctx.defaultValue(e.dataType)};")
s"""
${ev.code}
this.$isNull = ${ev.isNull};
this.$value = ${ev.value};
$isNull = ${ev.isNull};
$value = ${ev.value};
"""
} else {
val value = s"value_$i"
ctx.addMutableState(ctx.javaType(e.dataType), value,
s"this.$value = ${ctx.defaultValue(e.dataType)};")
s"$value = ${ctx.defaultValue(e.dataType)};")
s"""
${ev.code}
this.$value = ${ev.value};
$value = ${ev.value};
"""
}
}
Expand All @@ -87,7 +87,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP

val updates = validExpr.zip(index).map {
case (e, i) =>
val ev = ExprCode("", s"this.isNull_$i", s"this.value_$i")
val ev = ExprCode("", s"isNull_$i", s"value_$i")
ctx.updateColumn("mutableRow", e.dataType, i, ev, e.nullable)
}

Expand Down Expand Up @@ -135,6 +135,9 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
$allUpdates
return mutableRow;
}

${ctx.initNestedClasses()}
${ctx.declareNestedClasses()}
}
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
$comparisons
return 0;
}

${ctx.initNestedClasses()}
${ctx.declareNestedClasses()}
}"""

val code = CodeFormatter.stripOverlappingComments(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ object GeneratePredicate extends CodeGenerator[Expression, Predicate] {
${eval.code}
return !${eval.isNull} && ${eval.value};
}

${ctx.initNestedClasses()}
${ctx.declareNestedClasses()}
}"""

val code = CodeFormatter.stripOverlappingComments(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
val output = ctx.freshName("safeRow")
val values = ctx.freshName("values")
// These expressions could be split into multiple functions
ctx.addMutableState("Object[]", values, s"this.$values = null;")
ctx.addMutableState("Object[]", values, s"$values = null;")

val rowClass = classOf[GenericInternalRow].getName

Expand All @@ -65,10 +65,10 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
val allFields = ctx.splitExpressions(tmp, fieldWriters)
val code = s"""
final InternalRow $tmp = $input;
this.$values = new Object[${schema.length}];
$values = new Object[${schema.length}];
$allFields
final InternalRow $output = new $rowClass($values);
this.$values = null;
$values = null;
"""

ExprCode(code, "false", output)
Expand Down Expand Up @@ -184,6 +184,9 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
$allExpressions
return mutableRow;
}

${ctx.initNestedClasses()}
${ctx.declareNestedClasses()}
}
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
val rowWriterClass = classOf[UnsafeRowWriter].getName
val rowWriter = ctx.freshName("rowWriter")
ctx.addMutableState(rowWriterClass, rowWriter,
s"this.$rowWriter = new $rowWriterClass($bufferHolder, ${inputs.length});")
s"$rowWriter = new $rowWriterClass($bufferHolder, ${inputs.length});")

val resetWriter = if (isTopLevel) {
// For top level row writer, it always writes to the beginning of the global buffer holder,
Expand Down Expand Up @@ -182,7 +182,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
val arrayWriterClass = classOf[UnsafeArrayWriter].getName
val arrayWriter = ctx.freshName("arrayWriter")
ctx.addMutableState(arrayWriterClass, arrayWriter,
s"this.$arrayWriter = new $arrayWriterClass();")
s"$arrayWriter = new $arrayWriterClass();")
val numElements = ctx.freshName("numElements")
val index = ctx.freshName("index")
val element = ctx.freshName("element")
Expand Down Expand Up @@ -321,7 +321,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
val holder = ctx.freshName("holder")
val holderClass = classOf[BufferHolder].getName
ctx.addMutableState(holderClass, holder,
s"this.$holder = new $holderClass($result, ${numVarLenFields * 32});")
s"$holder = new $holderClass($result, ${numVarLenFields * 32});")

val resetBufferHolder = if (numVarLenFields == 0) {
""
Expand Down Expand Up @@ -402,6 +402,9 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
${eval.code.trim}
return ${eval.value};
}

${ctx.initNestedClasses()}
${ctx.declareNestedClasses()}
}
"""

Expand Down
Loading