Skip to content

[SPARK-18016][SQL][CATALYST] Code Generation: Constant Pool Limit #16648

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2005,7 +2005,7 @@
<include>**/*Suite.java</include>
</includes>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<argLine>-Xmx3g -Xss4096k -XX:ReservedCodeCacheSize=${CodeCacheSize}</argLine>
<argLine>-Xmx4g -Xss4096k -XX:ReservedCodeCacheSize=${CodeCacheSize}</argLine>
<environmentVariables>
<!--
Setting SPARK_DIST_CLASSPATH is a simple way to make sure any child processes
Expand Down Expand Up @@ -2054,7 +2054,7 @@
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>SparkTestSuite.txt</filereports>
<argLine>-ea -Xmx3g -XX:ReservedCodeCacheSize=${CodeCacheSize}</argLine>
<argLine>-ea -Xmx4g -XX:ReservedCodeCacheSize=${CodeCacheSize}</argLine>
<stderr/>
<environmentVariables>
<!--
Expand Down
7 changes: 7 additions & 0 deletions sql/catalyst/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<configuration>
<argLine>-Xmx4g -Xss4096k -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m</argLine>
</configuration>
</plugin>
<plugin>
<groupId>org.antlr</groupId>
<artifactId>antlr4-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,15 @@ case class MonotonicallyIncreasingID() extends LeafExpression with Nondeterminis
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val countTerm = ctx.freshName("count")
val partitionMaskTerm = ctx.freshName("partitionMask")
ctx.addMutableState(ctx.JAVA_LONG, countTerm, "")
ctx.addMutableState(ctx.JAVA_LONG, partitionMaskTerm, "")
ctx.addPartitionInitializationStatement(s"$countTerm = 0L;")
ctx.addPartitionInitializationStatement(s"$partitionMaskTerm = ((long) partitionIndex) << 33;")
val countTermAccessor = ctx.addMutableState(ctx.JAVA_LONG, countTerm, "")
val partitionMaskTermAccessor = ctx.addMutableState(ctx.JAVA_LONG, partitionMaskTerm, "")
ctx.addPartitionInitializationStatement(s"$countTermAccessor = 0L;")
ctx.addPartitionInitializationStatement(
s"$partitionMaskTermAccessor = ((long) partitionIndex) << 33;")

ev.copy(code = s"""
final ${ctx.javaType(dataType)} ${ev.value} = $partitionMaskTerm + $countTerm;
$countTerm++;""", isNull = "false")
final ${ctx.javaType(dataType)} ${ev.value} = $partitionMaskTermAccessor + $countTermAccessor;
$countTermAccessor++;""", isNull = "false")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Accessor suffixes to variable names add quite a bit of noise in this PR. What value do they add from your perspective?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having addMutableState return an accessor string is an important part of addressing the manner in which mutable state can contribute to Constant Pool errors. Code that creates mutable state usually takes for granted that the symbol used to declare the state will be inlined as a private member variable to the class. However, for sufficiently complicated schemas, mutable state and its initialization alone can breach the Constant Pool limit. The strategy I settled on was to have mutable state potentially be compacted into arrays of like type and initialization, this way, we can reduce the number of references that would count to the constant pool limit. Of course, if the mutable state is stored in an array, rather than in a private variable named after the symbol, we need to return back the accessor for that index in the compacted mutable state array, hence the 'accessor' suffixes. I had also tried a class-based approach, in which excess mutable state could become static members of nested classes, initialization functions for the state could still exceed the constant pool limit.

This PR can be condensed to two core components to approach a solution to the (hard-and-fast) Constant Pool limit:

  • split excess code among classes
  • compact excess mutable state into arrays

I should mention, not all mutable state is compacted into arrays. Only primitives and collections of simply-assigned objects (null assigned, or no assignment). But this array compaction strategy reduces references enough to allow even complex schemas in which we would potentially generate much more state than 2^16 to still be converted to datasets.

}

override def prettyName: String = "monotonically_increasing_id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -987,11 +987,11 @@ case class ScalaUDF(

val converterTerm = ctx.freshName("converter")
val expressionIdx = ctx.references.size - 1
ctx.addMutableState(converterClassName, converterTerm,
s"this.$converterTerm = ($converterClassName)$typeConvertersClassName" +
val converterTermAccessor = ctx.addMutableState(converterClassName, converterTerm,
s"$converterTerm = ($converterClassName)$typeConvertersClassName" +
s".createToScalaConverter(((${expressionClassName})((($scalaUDFClassName)" +
s"references[$expressionIdx]).getChildren().apply($index))).dataType());")
converterTerm
converterTermAccessor
}

override def doGenCode(
Expand All @@ -1004,8 +1004,9 @@ case class ScalaUDF(

// Generate codes used to convert the returned value of user-defined functions to Catalyst type
val catalystConverterTerm = ctx.freshName("catalystConverter")
ctx.addMutableState(converterClassName, catalystConverterTerm,
s"this.$catalystConverterTerm = ($converterClassName)$typeConvertersClassName" +
val catalystTermAccessor =
ctx.addMutableState(converterClassName, catalystConverterTerm,
s"$catalystConverterTerm = ($converterClassName)$typeConvertersClassName" +
s".createToCatalystConverter($scalaUDF.dataType());")

val resultTerm = ctx.freshName("result")
Expand All @@ -1018,8 +1019,8 @@ case class ScalaUDF(
val funcClassName = s"scala.Function${children.size}"

val funcTerm = ctx.freshName("udf")
ctx.addMutableState(funcClassName, funcTerm,
s"this.$funcTerm = ($funcClassName)$scalaUDF.userDefinedFunc();")
val funcTermAccessor = ctx.addMutableState(funcClassName, funcTerm,
s"$funcTerm = ($funcClassName)$scalaUDF.userDefinedFunc();")

// codegen for children expressions
val evals = children.map(_.genCode(ctx))
Expand All @@ -1036,12 +1037,12 @@ case class ScalaUDF(
(convert, argTerm)
}.unzip

val getFuncResult = s"$funcTerm.apply(${funcArguments.mkString(", ")})"
val getFuncResult = s"$funcTermAccessor.apply(${funcArguments.mkString(", ")})"
val callFunc =
s"""
${ctx.boxedType(dataType)} $resultTerm = null;
try {
$resultTerm = (${ctx.boxedType(dataType)})$catalystConverterTerm.apply($getFuncResult);
$resultTerm = (${ctx.boxedType(dataType)})$catalystTermAccessor.apply($getFuncResult);
} catch (Exception e) {
throw new org.apache.spark.SparkException($scalaUDF.udfErrorMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ case class SparkPartitionID() extends LeafExpression with Nondeterministic {

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val idTerm = ctx.freshName("partitionId")
ctx.addMutableState(ctx.JAVA_INT, idTerm, "")
ctx.addPartitionInitializationStatement(s"$idTerm = partitionIndex;")
ev.copy(code = s"final ${ctx.javaType(dataType)} ${ev.value} = $idTerm;", isNull = "false")
val idTermAccessor = ctx.addMutableState(ctx.JAVA_INT, idTerm, "")
ctx.addPartitionInitializationStatement(s"$idTermAccessor = partitionIndex;")
ev.copy(
code = s"final ${ctx.javaType(dataType)} ${ev.value} = $idTermAccessor;", isNull = "false")
}
}
Loading