Skip to content

Commit 1b27080

Browse files
committed
Fix
1 parent 962e330 commit 1b27080

File tree

7 files changed

+60
-34
lines changed

7 files changed

+60
-34
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1211,6 +1211,20 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
12111211
}
12121212
}
12131213

1214+
/**
1215+
* Java bytecode statistics of a compiled class by Janino.
1216+
*/
1217+
case class ByteCodeStats(maxClassCodeSize: Int, maxMethodCodeSize: Int, maxConstPoolSize: Int)
1218+
1219+
object ByteCodeStats {
1220+
1221+
val unavailable = ByteCodeStats(-1, -1, -1)
1222+
1223+
def apply(codeStats: (Int, Int, Int)): ByteCodeStats = {
1224+
ByteCodeStats(codeStats._1, codeStats._2, codeStats._3)
1225+
}
1226+
}
1227+
12141228
object CodeGenerator extends Logging {
12151229

12161230
// This is the default value of HugeMethodLimit in the OpenJDK HotSpot JVM,
@@ -1244,7 +1258,7 @@ object CodeGenerator extends Logging {
12441258
*
12451259
* @return a pair of a generated class and the max bytecode size of generated functions.
12461260
*/
1247-
def compile(code: CodeAndComment): (GeneratedClass, Int) = try {
1261+
def compile(code: CodeAndComment): (GeneratedClass, ByteCodeStats) = try {
12481262
cache.get(code)
12491263
} catch {
12501264
// Cache.get() may wrap the original exception. See the following URL
@@ -1257,7 +1271,7 @@ object CodeGenerator extends Logging {
12571271
/**
12581272
* Compile the Java source code into a Java class, using Janino.
12591273
*/
1260-
private[this] def doCompile(code: CodeAndComment): (GeneratedClass, Int) = {
1274+
private[this] def doCompile(code: CodeAndComment): (GeneratedClass, ByteCodeStats) = {
12611275
val evaluator = new ClassBodyEvaluator()
12621276

12631277
// A special classloader used to wrap the actual parent classloader of
@@ -1318,10 +1332,11 @@ object CodeGenerator extends Logging {
13181332
}
13191333

13201334
/**
1321-
* Returns the max bytecode size of the generated functions by inspecting janino private fields.
1322-
* Also, this method updates the metrics information.
1335+
* Returns the bytecode statistics (max class bytecode size, max method bytecode size, and
1336+
* max constant pool size) of generated classes by inspecting janino private fields.inspecting
1337+
* janino private fields. Also, this method updates the metrics information.
13231338
*/
1324-
private def updateAndGetCompilationStats(evaluator: ClassBodyEvaluator): Int = {
1339+
private def updateAndGetCompilationStats(evaluator: ClassBodyEvaluator): ByteCodeStats = {
13251340
// First retrieve the generated classes.
13261341
val classes = {
13271342
val resultField = classOf[SimpleCompiler].getDeclaredField("result")
@@ -1336,11 +1351,13 @@ object CodeGenerator extends Logging {
13361351
val codeAttr = Utils.classForName("org.codehaus.janino.util.ClassFile$CodeAttribute")
13371352
val codeAttrField = codeAttr.getDeclaredField("code")
13381353
codeAttrField.setAccessible(true)
1339-
val codeSizes = classes.flatMap { case (_, classBytes) =>
1340-
CodegenMetrics.METRIC_GENERATED_CLASS_BYTECODE_SIZE.update(classBytes.length)
1354+
val codeStats = classes.map { case (_, classBytes) =>
1355+
val classCodeSize = classBytes.length
1356+
CodegenMetrics.METRIC_GENERATED_CLASS_BYTECODE_SIZE.update(classCodeSize)
13411357
try {
13421358
val cf = new ClassFile(new ByteArrayInputStream(classBytes))
1343-
val stats = cf.methodInfos.asScala.flatMap { method =>
1359+
val constPoolSize = cf.getConstantPoolSize
1360+
val methodCodeSizes = cf.methodInfos.asScala.flatMap { method =>
13441361
method.getAttributes().filter(_.getClass eq codeAttr).map { a =>
13451362
val byteCodeSize = codeAttrField.get(a).asInstanceOf[Array[Byte]].length
13461363
CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(byteCodeSize)
@@ -1353,19 +1370,17 @@ object CodeGenerator extends Logging {
13531370
byteCodeSize
13541371
}
13551372
}
1356-
Some(stats)
1373+
(classCodeSize, methodCodeSizes.max, constPoolSize)
13571374
} catch {
13581375
case NonFatal(e) =>
13591376
logWarning("Error calculating stats of compiled class.", e)
1360-
None
1377+
(classCodeSize, -1, -1)
13611378
}
1362-
}.flatten
1363-
1364-
if (codeSizes.nonEmpty) {
1365-
codeSizes.max
1366-
} else {
1367-
0
13681379
}
1380+
1381+
ByteCodeStats(codeStats.reduce[(Int, Int, Int)] { case (v1, v2) =>
1382+
(Math.max(v1._1, v2._1), Math.max(v1._2, v2._2), Math.max(v1._3, v2._3))
1383+
})
13691384
}
13701385

13711386
/**
@@ -1380,8 +1395,8 @@ object CodeGenerator extends Logging {
13801395
private val cache = CacheBuilder.newBuilder()
13811396
.maximumSize(SQLConf.get.codegenCacheMaxEntries)
13821397
.build(
1383-
new CacheLoader[CodeAndComment, (GeneratedClass, Int)]() {
1384-
override def load(code: CodeAndComment): (GeneratedClass, Int) = {
1398+
new CacheLoader[CodeAndComment, (GeneratedClass, ByteCodeStats)]() {
1399+
override def load(code: CodeAndComment): (GeneratedClass, ByteCodeStats) = {
13851400
val startTime = System.nanoTime()
13861401
val result = doCompile(code)
13871402
val endTime = System.nanoTime()

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.spark.rdd.RDD
2525
import org.apache.spark.sql.{AnalysisException, SparkSession}
2626
import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker}
2727
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
28+
import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats
2829
import org.apache.spark.sql.catalyst.plans.QueryPlan
2930
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
3031
import org.apache.spark.sql.catalyst.rules.Rule
@@ -213,7 +214,7 @@ class QueryExecution(
213214
*
214215
* @return Sequence of WholeStageCodegen subtrees and corresponding codegen
215216
*/
216-
def codegenToSeq(): Seq[(String, String)] = {
217+
def codegenToSeq(): Seq[(String, String, ByteCodeStats)] = {
217218
org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan)
218219
}
219220

sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -688,7 +688,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
688688
override def doExecute(): RDD[InternalRow] = {
689689
val (ctx, cleanedSource) = doCodeGen()
690690
// try to compile and fallback if it failed
691-
val (_, maxCodeSize) = try {
691+
val (_, compiledCodeStats) = try {
692692
CodeGenerator.compile(cleanedSource)
693693
} catch {
694694
case NonFatal(_) if !Utils.isTesting && sqlContext.conf.codegenFallback =>
@@ -698,9 +698,9 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
698698
}
699699

700700
// Check if compiled code has a too large function
701-
if (maxCodeSize > sqlContext.conf.hugeMethodLimit) {
701+
if (compiledCodeStats.maxMethodCodeSize > sqlContext.conf.hugeMethodLimit) {
702702
logInfo(s"Found too long generated codes and JIT optimization might not work: " +
703-
s"the bytecode size ($maxCodeSize) is above the limit " +
703+
s"the bytecode size (${compiledCodeStats.maxMethodCodeSize}) is above the limit " +
704704
s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen was disabled " +
705705
s"for this plan (id=$codegenStageId). To avoid this, you can raise the limit " +
706706
s"`${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}`:\n$treeString")

sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@ package org.apache.spark.sql.execution
2020
import java.util.Collections
2121

2222
import scala.collection.JavaConverters._
23+
import scala.util.control.NonFatal
2324

2425
import org.apache.spark.broadcast.Broadcast
2526
import org.apache.spark.internal.Logging
2627
import org.apache.spark.rdd.RDD
2728
import org.apache.spark.sql._
2829
import org.apache.spark.sql.catalyst.InternalRow
2930
import org.apache.spark.sql.catalyst.expressions.Attribute
30-
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodegenContext, ExprCode}
31+
import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, CodeFormatter, CodegenContext, CodeGenerator, ExprCode}
3132
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
3233
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
3334
import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
@@ -81,11 +82,14 @@ package object debug {
8182
def writeCodegen(append: String => Unit, plan: SparkPlan): Unit = {
8283
val codegenSeq = codegenStringSeq(plan)
8384
append(s"Found ${codegenSeq.size} WholeStageCodegen subtrees.\n")
84-
for (((subtree, code), i) <- codegenSeq.zipWithIndex) {
85-
append(s"== Subtree ${i + 1} / ${codegenSeq.size} ==\n")
85+
for (((subtree, code, codeStats), i) <- codegenSeq.zipWithIndex) {
86+
val codeStatsStr = s"maxClassCodeSize:${codeStats.maxClassCodeSize} " +
87+
s"maxMethodCodeSize:${codeStats.maxMethodCodeSize} " +
88+
s"maxConstantPoolSize:${codeStats.maxConstPoolSize}"
89+
append(s"== Subtree ${i + 1} / ${codegenSeq.size} ($codeStatsStr) ==\n")
8690
append(subtree)
8791
append("\nGenerated code:\n")
88-
append(s"${code}\n")
92+
append(s"$code\n")
8993
}
9094
}
9195

@@ -95,7 +99,7 @@ package object debug {
9599
* @param plan the query plan for codegen
96100
* @return Sequence of WholeStageCodegen subtrees and corresponding codegen
97101
*/
98-
def codegenStringSeq(plan: SparkPlan): Seq[(String, String)] = {
102+
def codegenStringSeq(plan: SparkPlan): Seq[(String, String, ByteCodeStats)] = {
99103
val codegenSubtrees = new collection.mutable.HashSet[WholeStageCodegenExec]()
100104
plan transform {
101105
case s: WholeStageCodegenExec =>
@@ -105,7 +109,13 @@ package object debug {
105109
}
106110
codegenSubtrees.toSeq.map { subtree =>
107111
val (_, source) = subtree.doCodeGen()
108-
(subtree.toString, CodeFormatter.format(source))
112+
val codeStats = try {
113+
CodeGenerator.compile(source)._2
114+
} catch {
115+
case NonFatal(_) =>
116+
ByteCodeStats.unavailable
117+
}
118+
(subtree.toString, CodeFormatter.format(source), codeStats)
109119
}
110120
}
111121

@@ -130,7 +140,7 @@ package object debug {
130140
* @param query the streaming query for codegen
131141
* @return Sequence of WholeStageCodegen subtrees and corresponding codegen
132142
*/
133-
def codegenStringSeq(query: StreamingQuery): Seq[(String, String)] = {
143+
def codegenStringSeq(query: StreamingQuery): Seq[(String, String, ByteCodeStats)] = {
134144
val w = asStreamExecution(query)
135145
if (w.lastExecution != null) {
136146
codegenStringSeq(w.lastExecution.executedPlan)

sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
21-
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeGenerator}
21+
import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, CodeAndComment, CodeGenerator}
2222
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
2323
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
2424
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
@@ -213,10 +213,10 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession {
213213

214214
ignore("SPARK-21871 check if we can get large code size when compiling too long functions") {
215215
val codeWithShortFunctions = genGroupByCode(3)
216-
val (_, maxCodeSize1) = CodeGenerator.compile(codeWithShortFunctions)
216+
val (_, ByteCodeStats(_, maxCodeSize1, _)) = CodeGenerator.compile(codeWithShortFunctions)
217217
assert(maxCodeSize1 < SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get)
218218
val codeWithLongFunctions = genGroupByCode(50)
219-
val (_, maxCodeSize2) = CodeGenerator.compile(codeWithLongFunctions)
219+
val (_, ByteCodeStats(_, maxCodeSize2, _)) = CodeGenerator.compile(codeWithLongFunctions)
220220
assert(maxCodeSize2 > SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get)
221221
}
222222

sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class DebuggingSuite extends SharedSparkSession {
4646
val res = codegenStringSeq(spark.range(10).groupBy(col("id") * 2).count()
4747
.queryExecution.executedPlan)
4848
assert(res.length == 2)
49-
assert(res.forall{ case (subtree, code) =>
49+
assert(res.forall{ case (subtree, code, _) =>
5050
subtree.contains("Range") && code.contains("Object[]")})
5151
}
5252

sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils {
100100
val res = codegenStringSeq(spark.range(10).groupBy(col("id") * 2).count()
101101
.queryExecution.executedPlan)
102102
assert(res.length == 2)
103-
assert(res.forall { case (_, code) =>
103+
assert(res.forall { case (_, code, _) =>
104104
(code.contains("* Codegend pipeline") == flag) &&
105105
(code.contains("// input[") == flag)
106106
})

0 commit comments

Comments
 (0)