-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-23032][SQL] Add a per-query codegenStageId to WholeStageCodegenExec #20224
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
package org.apache.spark.sql.execution | ||
|
||
import java.util.Locale | ||
import java.util.function.Supplier | ||
|
||
import scala.collection.mutable | ||
|
||
|
@@ -414,6 +415,58 @@ object WholeStageCodegenExec { | |
} | ||
} | ||
|
||
object WholeStageCodegenId { | ||
// codegenStageId: ID for codegen stages within a query plan. | ||
// It does not affect equality, nor does it participate in destructuring pattern matching | ||
// of WholeStageCodegenExec. | ||
// | ||
// This ID is used to help differentiate between codegen stages. It is included as a part | ||
// of the explain output for physical plans, e.g. | ||
// | ||
// == Physical Plan == | ||
// *(5) SortMergeJoin [x#3L], [y#9L], Inner | ||
// :- *(2) Sort [x#3L ASC NULLS FIRST], false, 0 | ||
// : +- Exchange hashpartitioning(x#3L, 200) | ||
// : +- *(1) Project [(id#0L % 2) AS x#3L] | ||
// : +- *(1) Filter isnotnull((id#0L % 2)) | ||
// : +- *(1) Range (0, 5, step=1, splits=8) | ||
// +- *(4) Sort [y#9L ASC NULLS FIRST], false, 0 | ||
// +- Exchange hashpartitioning(y#9L, 200) | ||
// +- *(3) Project [(id#6L % 2) AS y#9L] | ||
// +- *(3) Filter isnotnull((id#6L % 2)) | ||
// +- *(3) Range (0, 5, step=1, splits=8) | ||
// | ||
// where the ID makes it obvious that not all adjacent codegen'd plan operators are of the | ||
// same codegen stage. | ||
// | ||
// The codegen stage ID is also optionally included in the name of the generated classes as | ||
// a suffix, so that it's easier to associate a generated class back to the physical operator. | ||
// This is controlled by SQLConf: spark.sql.codegen.useIdInClassName | ||
// | ||
// The ID is also included in various log messages. | ||
// | ||
// Within a query, a codegen stage in a plan starts counting from 1, in "insertion order". | ||
// WholeStageCodegenExec operators are inserted into a plan in depth-first post-order. | ||
// See CollapseCodegenStages.insertWholeStageCodegen for the definition of insertion order. | ||
// | ||
// 0 is reserved as a special ID value to indicate a temporary WholeStageCodegenExec object | ||
// is created, e.g. for special fallback handling when an existing WholeStageCodegenExec | ||
// failed to generate/compile code. | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should describe about the usage of such codegen stage id, e.g., the codegen stage id would show up in explain string and generated class name. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure thing. Will address it in the next update. Thanks! |
||
private val codegenStageCounter = ThreadLocal.withInitial(new Supplier[Integer] { | ||
override def get() = 1 // TODO: change to Scala lambda syntax when upgraded to Scala 2.12+ | ||
}) | ||
|
||
def resetPerQuery(): Unit = codegenStageCounter.set(1) | ||
|
||
def getNextStageId(): Int = { | ||
val counter = codegenStageCounter | ||
val id = counter.get() | ||
counter.set(id + 1) | ||
id | ||
} | ||
} | ||
|
||
/** | ||
* WholeStageCodegen compiles a subtree of plans that support codegen together into single Java | ||
* function. | ||
|
@@ -442,7 +495,8 @@ object WholeStageCodegenExec { | |
* `doCodeGen()` will create a `CodeGenContext`, which will hold a list of variables for input, | ||
* used to generated code for [[BoundReference]]. | ||
*/ | ||
case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport { | ||
case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) | ||
extends UnaryExecNode with CodegenSupport { | ||
|
||
override def output: Seq[Attribute] = child.output | ||
|
||
|
@@ -454,6 +508,12 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co | |
"pipelineTime" -> SQLMetrics.createTimingMetric(sparkContext, | ||
WholeStageCodegenExec.PIPELINE_DURATION_METRIC)) | ||
|
||
def generatedClassName(): String = if (conf.wholeStageUseIdInClassName) { | ||
s"GeneratedIteratorForCodegenStage$codegenStageId" | ||
} else { | ||
"GeneratedIterator" | ||
} | ||
|
||
/** | ||
* Generates code for this subtree. | ||
* | ||
|
@@ -471,19 +531,23 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co | |
} | ||
""", inlineToOuterClass = true) | ||
|
||
val className = generatedClassName() | ||
|
||
val source = s""" | ||
public Object generate(Object[] references) { | ||
return new GeneratedIterator(references); | ||
return new $className(references); | ||
} | ||
|
||
${ctx.registerComment(s"""Codegend pipeline for\n${child.treeString.trim}""")} | ||
final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { | ||
${ctx.registerComment( | ||
s"""Codegend pipeline for stage (id=$codegenStageId) | ||
|${this.treeString.trim}""".stripMargin)} | ||
final class $className extends ${classOf[BufferedRowIterator].getName} { | ||
|
||
private Object[] references; | ||
private scala.collection.Iterator[] inputs; | ||
${ctx.declareMutableStates()} | ||
|
||
public GeneratedIterator(Object[] references) { | ||
public $className(Object[] references) { | ||
this.references = references; | ||
} | ||
|
||
|
@@ -516,7 +580,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co | |
} catch { | ||
case _: Exception if !Utils.isTesting && sqlContext.conf.codegenFallback => | ||
// We should already saw the error message | ||
logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString") | ||
logWarning(s"Whole-stage codegen disabled for plan (id=$codegenStageId):\n $treeString") | ||
return child.execute() | ||
} | ||
|
||
|
@@ -525,7 +589,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co | |
logInfo(s"Found too long generated codes and JIT optimization might not work: " + | ||
s"the bytecode size ($maxCodeSize) is above the limit " + | ||
s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen was disabled " + | ||
s"for this plan. To avoid this, you can raise the limit " + | ||
s"for this plan (id=$codegenStageId). To avoid this, you can raise the limit " + | ||
s"`${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}`:\n$treeString") | ||
child match { | ||
// The fallback solution of batch file source scan still uses WholeStageCodegenExec | ||
|
@@ -603,10 +667,12 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co | |
verbose: Boolean, | ||
prefix: String = "", | ||
addSuffix: Boolean = false): StringBuilder = { | ||
child.generateTreeString(depth, lastChildren, builder, verbose, "*") | ||
child.generateTreeString(depth, lastChildren, builder, verbose, s"*($codegenStageId) ") | ||
} | ||
|
||
override def needStopCheck: Boolean = true | ||
|
||
override protected def otherCopyArgs: Seq[AnyRef] = Seq(codegenStageId.asInstanceOf[Integer]) | ||
} | ||
|
||
|
||
|
@@ -657,13 +723,14 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] { | |
case plan if plan.output.length == 1 && plan.output.head.dataType.isInstanceOf[ObjectType] => | ||
plan.withNewChildren(plan.children.map(insertWholeStageCodegen)) | ||
case plan: CodegenSupport if supportCodegen(plan) => | ||
WholeStageCodegenExec(insertInputAdapter(plan)) | ||
WholeStageCodegenExec(insertInputAdapter(plan))(WholeStageCodegenId.getNextStageId()) | ||
case other => | ||
other.withNewChildren(other.children.map(insertWholeStageCodegen)) | ||
} | ||
|
||
def apply(plan: SparkPlan): SparkPlan = { | ||
if (conf.wholeStageEnabled) { | ||
WholeStageCodegenId.resetPerQuery() | ||
insertWholeStageCodegen(plan) | ||
} else { | ||
plan | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we disable codegen stage id in both explain result and generated class name at the same time? It seems not be useful if we disable it in class name but keep it in explain result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's always good to have id in explain and generated classes. The only concern is we may have codegen cache issues if putting id in the class name, so we need a config to turn it off.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense to me.