-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-23032][SQL] Add a per-query codegenStageId to WholeStageCodegenExec #20224
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
18a3860
to
fa25f72
Compare
jenkins retest this please |
One comment as to using case class WholeStageCodegenExec(child: SparkPlan)(private val codegenStageId: Int)
extends UnaryExecNode with CodegenSupport and then explicitly thread the However that turned out to be much more complicated than just using a |
Test build #85940 has finished for PR 20224 at commit
|
I totally agree to add unique ID to a generated class. This is because all of the generated code by whole-stage codegen has the same class name. It makes us hard to debug in a production environment. On the other hand, IIUC, the current implementation disables caching mechanism regarding the same query in Spark. To add an unique ID generated different string for Java code. WDYT? I am thinking about adding an ID related to a task into a comment or other parts. |
We always need to turn on this? It seems this is debug info for developers? |
Thanks for your comments and questions, @kiszk and @maropu ! tl;dr On top of my original proposal in the PR description / JIRA ticket, I'd like to further add: Side note: Putting the codegen stage ID into the Detail Discussions My proposal and PR currently does 3 things:
Of the above, (1) is the fundamentals, while (2) and (3) are separate applications of using the information from (1). Would you (@kiszk and @maropu ) agree that at least having both (1) and (2) is a good idea? They don't interact with anything else at runtime, so there so behavioral change or performance implications because of them. They can be always turned on with minimal overhead. @rxin did point out that our current explain output for physical plans is already pretty cluttered and not user-friendly enough, so it makes sense to have a "verbose mode" in the future and then make the default mode less cluttered. But that's out of scope for this change. For (3), @kiszk does point out that there's an interaction between the generated code (in source string + comments form) and the codegen cache (from This PR proposes an ID unique within a query. If the same query is run multiple times, it'll generate the exact same code (with the IDs included), so at least with the current implementation, we can guarantee that there won't be redundant compilation for multiple runs of the same query. I mentioned this in the PR description:
Within a query, though, before this change there could have been cases where there can be codegen stages that happens to have the exact same source code, thus would work well with the codegen cache. After this change, such cases would end up generating code with different IDs embedded into the class name so they'll have different source code, thus won't hit the codegen cache and would have to be compiled separately. Here's an example that would hit this case:
With this change, you can see the different codegen stages as follows:
The generated code for codegen stages (1) and (3) are actually identical, so the codegen cache will save us from redundantly compiling one of the stages. I would argue that this kind of codegen cache hit is accidental, brittle, and shouldn't be considered a must have. On the other hand, having such IDs in the class name is very useful for all kinds of diagnosis, not just for Spark developers. But to avoid having any regressions whatsoever, I do agree we should have a config option to choose whether or not to embed this codegen stage ID into the generated class name. |
Thanks for your comments, @viirya ! I'd say only having (1) and (2) makes it much less useful than having all 3, but it's still useful in its own for helping people understand exactly which physical operators were fused into a single codegen stage (as opposed to assuming adjacent codegen'd operators are always in the same codegen stage). With (3) and the new proposal of reserving Thanks! |
As high level comment, to add IDs helps performance/error diagnosis in production environments. I strongly support to always enable this. |
@@ -312,6 +313,24 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp | |||
object WholeStageCodegenExec { | |||
val PIPELINE_DURATION_METRIC = "duration" | |||
|
|||
private val codegenStageCounter = ThreadLocal.withInitial(new Supplier[Integer] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of adding these logics into object WholeStageCodegenExec
, could we just create a new object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But IMHO this is the place to put it, since it's closely tied to the initialization of WholeStageCodegenExec
object instances.
Are you suggesting something like a object WholeStageCodegenId
and move the newly added methods there instead? That would work too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. Creating a new object WholeStageCodegenId
is better.
Overall, the proposal looks good to me. We need a test case in |
insertWholeStageCodegen(plan) | ||
WholeStageCodegenExec.initializeCodegenStageCounterPerQuery() | ||
val newPlan = insertWholeStageCodegen(plan) | ||
WholeStageCodegenExec.resetCodegenStageCounter() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to reset the counter? Is it a bad idea to use just a simple incremental counter like exprId
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's exactly the concern that @kiszk brought up: the codegen cache uses the generated source code as the key, so any differences in the source code text would break the cache hit.
Imagine the counter were a globally (or Spark session-local globally) atomically incrementing, then if the same query were run twice, the codegen stages in those two runs will actually get different sets of IDs, resulting in different source code text (everything's the same except for the ID), and then it'll render the codegen cache useless -- basically nothing will never hit the cache since the ID will always be different.
In fact, you'll find that the IDs in the explain output through df.explain()
are going to be different from the ones you see in Spark UI's SQL tab's treeString, because explain is actually "one query with the ExplainCommand
as the root". I had hit this exact problem in my early prototype and soon realized this isn't going to be user-friendly.
By making the ID only increment within a query, we can make sure the codegen cache works for multiple runs of the same (or identically structured) query, and still be able to differentiate the codegen stages within a query.
Thanks @gatorsmile ! Will add a new test case in |
fa25f72
to
a0162aa
Compare
I've updated the PR addressing @gatorsmile 's comments: moved the new utility code to I've also added a config flag in Also added a new test case to ping @gatorsmile @kiszk @maropu @viirya to have a second look. Thanks! |
a0162aa
to
a7ceda2
Compare
Test build #86516 has finished for PR 20224 at commit
|
Test build #86515 has finished for PR 20224 at commit
|
jenkins retest this please |
Test build #86521 has finished for PR 20224 at commit
|
also ping @cloud-fan |
|
||
spark.range(5).select('id * 2).collect | ||
val after3 = bytecodeSizeHisto.getCount | ||
assert(after3 >= after2, "a different query can result in codegen cache miss, that's okay") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after3 > after2
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually deliberately wrote it this way. Note how I phrased in the assertion message as "can result in codegen cache miss" instead of "will result in".
That's because the code shape of this third query was deliberately chosen to be similar to the two queries before it: all three have spark.range(some_const).select(some_expr).collect
, so if any future changes to codegen of Range
or Project
operators affect how much specialized code (such as constant values) we directly embed into the code, it's actually possible for this third query to generate the same code as the first two, which will result in a codegen cache hit -- the bytecode-compiled method count would therefore be the same.
So I'm making this check a bit loose. It's just there to indicate that it's acceptable to for a different query to encounter a codegen cache miss. WYDT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense
* is created, e.g. for special fallback handling when an existing WholeStageCodegenExec | ||
* failed to generate/compile code. | ||
*/ | ||
val codegenStageId = WholeStageCodegenId.getNextStageId() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my major concern is, what if we transform the physical plan tree after adding WholeStageCodegenExec
? e.g. during transformation we may copy a plan code, then a copied WholeStageCodegenExec
will have a different stage id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your comment, @cloud-fan ! That's a nice catch that I hadn't really thought about.
All the examples that I've run with are ones that wouldn't trigger changes to the plan after CollapseCodegenStages
, which means for those examples ReuseExchange
/ ReuseSubqueries
wouldn't have triggered.
Yes, these two rules could potentially change the physical plan, which means when we transformUp
in those rules (and any future rule after CollapseCodegenStages
) it'd create new WholeStageCodegenExec
objects outside of CollapseCodegenStages
, and with my current implementation that'll result in the WSC copies having a codegen stage ID of 0.
One way to workaround this is to move CollapseCodegenStages
to always be the last rule in org.apache.spark.sql.execution.QueryExecution#preparations
, so that we're sure there's no other transformation on the physical plan that could change the structure of the plan, except for fallback handling that could happen in a couple of doExecute()
s -- these exception cases are expected and to me they are acceptable.
If we go down that route, I'll probably have to tweak CollapseCodegenStages
a little bit so that it can cope with the physical query plan potentially becoming a DAG instead of a tree, as the ReuseExchange
/ ReuseSubqueries
rules may do that kind of transformation. This tweak is easy to implement and low risk: simply bailing out of the transforming a subtree when it sees a WholeStageCodegenExec
already inserted into the plan would suffice.
^^ scratch that. I'll need something a bit more involved to deal with DAGs in this case.
Let me actually update the PR with this tweak and see what happens in tests.
BTW, inspired by @cloud-fan 's comment, here's an example of the codegen stage IDs when scalar subqueries are involved: val sub = "(select sum(id) from range(5))"
val df = spark.sql(s"select $sub as a, $sub as b")
df.explain(true) would give:
The reason why the IDs look a bit "odd" (that there are three separate codegen stages with ID 1) is because the main "spine" query and each individual subqueries are "planned" separately, thus they'd run |
a7ceda2
to
e449216
Compare
Updated the PR:
The test case triggers
Note the
|
Test build #86612 has finished for PR 20224 at commit
|
@@ -629,6 +629,13 @@ object SQLConf { | |||
.booleanConf | |||
.createWithDefault(true) | |||
|
|||
val WHOLESTAGE_CODEGEN_USE_ID_IN_CLASS_NAME = | |||
buildConf("spark.sql.codegen.wholeStage.useIdInClassName") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: It seems other whole stage codegen configs don't have the wholeStage
prefix, shall we remove it to make them consistent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm open to suggestions for the config option name. Do you have any concrete suggestions?
Looks like you're suggesting spark.sql.codegen.useIdInClassName
, right?
I chose the current name (prefix) for two reasons:
- the config option right before mine is named
spark.sql.codegen.wholeStage
, and I just used it as a prefix - this option only affects whole-stage codegen and not other (expression/predicate/ordering/encoder) codegens.
But you're making a good point that all the other whole-stage codegen config options (the ones following this one) only use spark.sql.codegen
as the prefix. So if you'd confirm that I understood your suggestion correctly, I'll update the PR to address it.
Thanks!
@@ -325,6 +326,28 @@ object WholeStageCodegenExec { | |||
} | |||
} | |||
|
|||
object WholeStageCodegenId { | |||
private val codegenStageCounter = ThreadLocal.withInitial(new Supplier[Integer] { | |||
override def get() = 0 // TODO: change to Scala lambda syntax when upgraded to Scala 2.12+ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we just use 1 as initial value and add a comment to say that 0 is preserved for temporary WholeStageCodegenExec objects? Then we only need a initialize
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the updated PR that uses the secondary constructor in WholeStageCodegenExec
, yes you're making a good point. All the places that create temporary WholeStageCodegenExec
objects are explicitly passing in 0
as the codegen stage ID now, so we can indeed simplify the counter logic here.
Will address in the next update.
LGTM |
e449216
to
a11232e
Compare
LGTM, pending jenkins |
|
||
spark.range(5).select('id * 2).collect | ||
val after3 = bytecodeSizeHisto.getCount | ||
assert(after3 >= after2, "a different query can result in codegen cache miss, that's okay") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: a different query can result in codegen cache miss, that's okay
seems a misleading error message for the assert.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That assert is actually "useless" in the sense that the CodegenMetrics are always monotonically increasing, so after3 >= after2
will always be true (note the =
in there). I only put it in to show the intent that it's by design that a different query can cause codegen cache miss.
Would you have any concrete suggestions in wording that assertion message? I'm totally open to suggestions here. I can move the current message into a comment and make the assert message look more like an assert message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is an error message so it's weird to use it to explain this assert. I think you can make it as a comment to explain the assert and remove the error message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok seems this assert is not very useful, how about we just remove it? We can just add a comment before assert(after1 == after2, "the same query run twice should hit the codegen cache")
and say that different queries can cause codegen cache miss, so this assert proves the query is same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestions, @viirya and @cloud-fan ! I'll move the assert message to a comment in the next update.
// 0 is reserved as a special ID value to indicate a temporary WholeStageCodegenExec object | ||
// is created, e.g. for special fallback handling when an existing WholeStageCodegenExec | ||
// failed to generate/compile code. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should describe about the usage of such codegen stage id, e.g., the codegen stage id would show up in explain string and generated class name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing. Will address it in the next update. Thanks!
.doc("When true, embed the (whole-stage) codegen stage ID into " + | ||
"the class name of the generated class as a suffix") | ||
.booleanConf | ||
.createWithDefault(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we disable codegen stage id in both explain result and generated class name at the same time? It seems not be useful if we disable it in class name but keep it in explain result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's always good to have id in explain and generated classes. The only concern is we may have codegen cache issues if putting id in the class name, so we need a config to turn it off.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense to me.
Test build #86627 has finished for PR 20224 at commit
|
retest this please. |
LGTM with minor comments. |
a11232e
to
5c99777
Compare
Updated again. Addressed @viirya 's comments:
|
// a different query can result in codegen cache miss, that's by design | ||
spark.range(5).select('id * 2).collect | ||
val after3 = bytecodeSizeHisto.getCount | ||
assert(after3 >= after2, "always") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, I think we can just remove the above 3 lines and add a comment before L258, to say that the CodegenMetrics are always monotonically increasing, so after1 == after2
proves we hit the codegen cache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that. Updating now.
5c99777
to
ce8171a
Compare
Updated again to address @cloud-fan 's comments: removed unneeded test case and added a bit more comments. |
Test build #86631 has finished for PR 20224 at commit
|
Test build #86632 has finished for PR 20224 at commit
|
Test build #86636 has finished for PR 20224 at commit
|
Test build #86638 has finished for PR 20224 at commit
|
ce8171a
to
b2e2cb0
Compare
} | ||
|
||
${ctx.registerComment(s"""Codegend pipeline for\n${child.treeString.trim}""")} | ||
final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { | ||
final class $className extends ${classOf[BufferedRowIterator].getName} { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we always add codegenStageId
as a comment by using ctx.registerComment()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's a great idea that I missed! Thanks for your comments, @kiszk !
We can get that effect in two ways:
- Change the current line
${ctx.registerComment(s"""Codegend pipeline for\n${child.treeString.trim}""")}
fromchild.treeString
tothis.treeString
, which will include the codegen stage ID through thetreeString
, just like the explain output. - Simply add
$codegenStageId
into theCodegend pipeline for
line. - Do both above. (Did I say two...?)
Which do you prefer?
$ git diff
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index b0090af77e..0e525b1e22 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -538,7 +538,9 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
return new $className(references);
}
- ${ctx.registerComment(s"""Codegend pipeline for\n${child.treeString.trim}""")}
+ ${ctx.registerComment(
+ s"""Codegend pipeline for stage (id=$codegenStageId)
+ |${this.treeString.trim}""".stripMargin)}
final class $className extends ${classOf[BufferedRowIterator].getName} {
private Object[] references;
An example generated code with comments enabled is:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage2(references);
/* 003 */ }
/* 004 */
/* 005 */ /**
* Codegend pipeline for stage (id=2)
* *(2) Project [(id#0L + 1) AS x#4L]
* +- *(2) Sort [id#0L ASC NULLS FIRST], true, 0
* +- Exchange rangepartitioning(id#0L ASC NULLS FIRST, 200)
* +- *(1) Range (0, 1, step=1, splits=8)
*/
/* 006 */ final class GeneratedIteratorForCodegenStage2 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank for your changes. They look very good.
I missed one fact (I was sleepy :)). ctx.registerComment()
is enabled only when spark.sql.codegen.comments
is true
. It would be good to add the id in the comment regardless of spark.sql.codegen.comments
since this comment is very small.
I could create a follow-up PR this afternoon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I like that idea too. Please ping me on the follow-up PR as well. Thanks!
Test build #86655 has finished for PR 20224 at commit
|
Also added a new test case to HiveExplainSuite to make sure the codegen stage ID is indeed included in the explain output of the physical plan, and another new test case in WholeStageCodegenSuite to make sure with the ID included into the generated class name, the generated code can still hit the codegen cache for the same query.
b2e2cb0
to
fd8983e
Compare
…nExec ## What changes were proposed in this pull request? **Proposal** Add a per-query ID to the codegen stages as represented by `WholeStageCodegenExec` operators. This ID will be used in - the explain output of the physical plan, and in - the generated class name. Specifically, this ID will be stable within a query, counting up from 1 in depth-first post-order for all the `WholeStageCodegenExec` inserted into a plan. The ID value 0 is reserved for "free-floating" `WholeStageCodegenExec` objects, which may have been created for one-off purposes, e.g. for fallback handling of codegen stages that failed to codegen the whole stage and wishes to codegen a subset of the children operators (as seen in `org.apache.spark.sql.execution.FileSourceScanExec#doExecute`). Example: for the following query: ```scala scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1) scala> val df1 = spark.range(10).select('id as 'x, 'id + 1 as 'y).orderBy('x).select('x + 1 as 'z, 'y) df1: org.apache.spark.sql.DataFrame = [z: bigint, y: bigint] scala> val df2 = spark.range(5) df2: org.apache.spark.sql.Dataset[Long] = [id: bigint] scala> val query = df1.join(df2, 'z === 'id) query: org.apache.spark.sql.DataFrame = [z: bigint, y: bigint ... 1 more field] ``` The explain output before the change is: ```scala scala> query.explain == Physical Plan == *SortMergeJoin [z#9L], [id#13L], Inner :- *Sort [z#9L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(z#9L, 200) : +- *Project [(x#3L + 1) AS z#9L, y#4L] : +- *Sort [x#3L ASC NULLS FIRST], true, 0 : +- Exchange rangepartitioning(x#3L ASC NULLS FIRST, 200) : +- *Project [id#0L AS x#3L, (id#0L + 1) AS y#4L] : +- *Range (0, 10, step=1, splits=8) +- *Sort [id#13L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#13L, 200) +- *Range (0, 5, step=1, splits=8) ``` Note how codegen'd operators are annotated with a prefix `"*"`. See how the `SortMergeJoin` operator and its direct children `Sort` operators are adjacent and all annotated with the `"*"`, so it's hard to tell they're actually in separate codegen stages. and after this change it'll be: ```scala scala> query.explain == Physical Plan == *(6) SortMergeJoin [z#9L], [id#13L], Inner :- *(3) Sort [z#9L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(z#9L, 200) : +- *(2) Project [(x#3L + 1) AS z#9L, y#4L] : +- *(2) Sort [x#3L ASC NULLS FIRST], true, 0 : +- Exchange rangepartitioning(x#3L ASC NULLS FIRST, 200) : +- *(1) Project [id#0L AS x#3L, (id#0L + 1) AS y#4L] : +- *(1) Range (0, 10, step=1, splits=8) +- *(5) Sort [id#13L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#13L, 200) +- *(4) Range (0, 5, step=1, splits=8) ``` Note that the annotated prefix becomes `"*(id) "`. See how the `SortMergeJoin` operator and its direct children `Sort` operators have different codegen stage IDs. It'll also show up in the name of the generated class, as a suffix in the format of `GeneratedClass$GeneratedIterator$id`. For example, note how `GeneratedClass$GeneratedIteratorForCodegenStage3` and `GeneratedClass$GeneratedIteratorForCodegenStage6` in the following stack trace corresponds to the IDs shown in the explain output above: ``` "Executor task launch worker for task 42412957" daemon prio=5 tid=0x58 nid=NA runnable java.lang.Thread.State: RUNNABLE at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:109) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.sort_addToSorter$(generated.java:32) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(generated.java:41) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$9$$anon$1.hasNext(WholeStageCodegenExec.scala:494) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.findNextInnerJoinRows$(generated.java:42) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(generated.java:101) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$2.hasNext(WholeStageCodegenExec.scala:513) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) ``` **Rationale** Right now, the codegen from Spark SQL lacks the means to differentiate between a couple of things: 1. It's hard to tell which physical operators are in the same WholeStageCodegen stage. Note that this "stage" is a separate notion from Spark's RDD execution stages; this one is only to delineate codegen units. There can be adjacent physical operators that are both codegen'd but are in separate codegen stages. Some of this is due to hacky implementation details, such as the case with `SortMergeJoin` and its `Sort` inputs -- they're hard coded to be split into separate stages although both are codegen'd. When printing out the explain output of the physical plan, you'd only see the codegen'd physical operators annotated with a preceding star (`'*'`) but would have no way to figure out if they're in the same stage. 2. Performance/error diagnosis The generated code has class/method names that are hard to differentiate between queries or even between codegen stages within the same query. If we use a Java-level profiler to collect profiles, or if we encounter a Java-level exception with a stack trace in it, it's really hard to tell which part of a query it's at. By introducing a per-query codegen stage ID, we'd at least be able to know which codegen stage (and in turn, which group of physical operators) was a profile tick or an exception happened. The reason why this proposal uses a per-query ID is because it's stable within a query, so that multiple runs of the same query will see the same resulting IDs. This both benefits understandability for users, and also it plays well with the codegen cache in Spark SQL which uses the generated source code as the key. The downside to using per-query IDs as opposed to a per-session or globally incrementing ID is of course we can't tell apart different query runs with this ID alone. But for now I believe this is a good enough tradeoff. ## How was this patch tested? Existing tests. This PR does not involve any runtime behavior changes other than some name changes. The SQL query test suites that compares explain outputs have been updates to ignore the newly added `codegenStageId`. Author: Kris Mok <kris.mok@databricks.com> Closes #20224 from rednaxelafx/wsc-codegenstageid. (cherry picked from commit e57f394) Signed-off-by: gatorsmile <gatorsmile@gmail.com>
Test build #86660 has finished for PR 20224 at commit
|
Thanks! Merged to master/2.3 |
What changes were proposed in this pull request?
Proposal
Add a per-query ID to the codegen stages as represented by
WholeStageCodegenExec
operators. This ID will be used inSpecifically, this ID will be stable within a query, counting up from 1 in depth-first post-order for all the
WholeStageCodegenExec
inserted into a plan.The ID value 0 is reserved for "free-floating"
WholeStageCodegenExec
objects, which may have been created for one-off purposes, e.g. for fallback handling of codegen stages that failed to codegen the whole stage and wishes to codegen a subset of the children operators (as seen inorg.apache.spark.sql.execution.FileSourceScanExec#doExecute
).Example: for the following query:
The explain output before the change is:
Note how codegen'd operators are annotated with a prefix
"*"
. See how theSortMergeJoin
operator and its direct childrenSort
operators are adjacent and all annotated with the"*"
, so it's hard to tell they're actually in separate codegen stages.and after this change it'll be:
Note that the annotated prefix becomes
"*(id) "
. See how theSortMergeJoin
operator and its direct childrenSort
operators have different codegen stage IDs.It'll also show up in the name of the generated class, as a suffix in the format of
GeneratedClass$GeneratedIterator$id
.For example, note how
GeneratedClass$GeneratedIteratorForCodegenStage3
andGeneratedClass$GeneratedIteratorForCodegenStage6
in the following stack trace corresponds to the IDs shown in the explain output above:Rationale
Right now, the codegen from Spark SQL lacks the means to differentiate between a couple of things:
It's hard to tell which physical operators are in the same WholeStageCodegen stage. Note that this "stage" is a separate notion from Spark's RDD execution stages; this one is only to delineate codegen units.
There can be adjacent physical operators that are both codegen'd but are in separate codegen stages. Some of this is due to hacky implementation details, such as the case with
SortMergeJoin
and itsSort
inputs -- they're hard coded to be split into separate stages although both are codegen'd.When printing out the explain output of the physical plan, you'd only see the codegen'd physical operators annotated with a preceding star (
'*'
) but would have no way to figure out if they're in the same stage.Performance/error diagnosis
The generated code has class/method names that are hard to differentiate between queries or even between codegen stages within the same query. If we use a Java-level profiler to collect profiles, or if we encounter a Java-level exception with a stack trace in it, it's really hard to tell which part of a query it's at.
By introducing a per-query codegen stage ID, we'd at least be able to know which codegen stage (and in turn, which group of physical operators) was a profile tick or an exception happened.
The reason why this proposal uses a per-query ID is because it's stable within a query, so that multiple runs of the same query will see the same resulting IDs. This both benefits understandability for users, and also it plays well with the codegen cache in Spark SQL which uses the generated source code as the key.
The downside to using per-query IDs as opposed to a per-session or globally incrementing ID is of course we can't tell apart different query runs with this ID alone. But for now I believe this is a good enough tradeoff.
How was this patch tested?
Existing tests. This PR does not involve any runtime behavior changes other than some name changes.
The SQL query test suites that compares explain outputs have been updates to ignore the newly added
codegenStageId
.