Skip to content

[SPARK-23032][SQL] Add a per-query codegenStageId to WholeStageCodegenExec #20224

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

rednaxelafx
Copy link
Contributor

@rednaxelafx rednaxelafx commented Jan 10, 2018

What changes were proposed in this pull request?

Proposal

Add a per-query ID to the codegen stages as represented by WholeStageCodegenExec operators. This ID will be used in

  • the explain output of the physical plan, and in
  • the generated class name.

Specifically, this ID will be stable within a query, counting up from 1 in depth-first post-order for all the WholeStageCodegenExec inserted into a plan.
The ID value 0 is reserved for "free-floating" WholeStageCodegenExec objects, which may have been created for one-off purposes, e.g. for fallback handling of codegen stages that failed to codegen the whole stage and wishes to codegen a subset of the children operators (as seen in org.apache.spark.sql.execution.FileSourceScanExec#doExecute).

Example: for the following query:

scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1)

scala> val df1 = spark.range(10).select('id as 'x, 'id + 1 as 'y).orderBy('x).select('x + 1 as 'z, 'y)
df1: org.apache.spark.sql.DataFrame = [z: bigint, y: bigint]

scala> val df2 = spark.range(5)
df2: org.apache.spark.sql.Dataset[Long] = [id: bigint]

scala> val query = df1.join(df2, 'z === 'id)
query: org.apache.spark.sql.DataFrame = [z: bigint, y: bigint ... 1 more field]

The explain output before the change is:

scala> query.explain
== Physical Plan ==
*SortMergeJoin [z#9L], [id#13L], Inner
:- *Sort [z#9L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(z#9L, 200)
:     +- *Project [(x#3L + 1) AS z#9L, y#4L]
:        +- *Sort [x#3L ASC NULLS FIRST], true, 0
:           +- Exchange rangepartitioning(x#3L ASC NULLS FIRST, 200)
:              +- *Project [id#0L AS x#3L, (id#0L + 1) AS y#4L]
:                 +- *Range (0, 10, step=1, splits=8)
+- *Sort [id#13L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#13L, 200)
      +- *Range (0, 5, step=1, splits=8)

Note how codegen'd operators are annotated with a prefix "*". See how the SortMergeJoin operator and its direct children Sort operators are adjacent and all annotated with the "*", so it's hard to tell they're actually in separate codegen stages.

and after this change it'll be:

scala> query.explain
== Physical Plan ==
*(6) SortMergeJoin [z#9L], [id#13L], Inner
:- *(3) Sort [z#9L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(z#9L, 200)
:     +- *(2) Project [(x#3L + 1) AS z#9L, y#4L]
:        +- *(2) Sort [x#3L ASC NULLS FIRST], true, 0
:           +- Exchange rangepartitioning(x#3L ASC NULLS FIRST, 200)
:              +- *(1) Project [id#0L AS x#3L, (id#0L + 1) AS y#4L]
:                 +- *(1) Range (0, 10, step=1, splits=8)
+- *(5) Sort [id#13L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#13L, 200)
      +- *(4) Range (0, 5, step=1, splits=8)

Note that the annotated prefix becomes "*(id) ". See how the SortMergeJoin operator and its direct children Sort operators have different codegen stage IDs.

It'll also show up in the name of the generated class, as a suffix in the format of GeneratedClass$GeneratedIterator$id.

For example, note how GeneratedClass$GeneratedIteratorForCodegenStage3 and GeneratedClass$GeneratedIteratorForCodegenStage6 in the following stack trace corresponds to the IDs shown in the explain output above:

"Executor task launch worker for task 424@12957" daemon prio=5 tid=0x58 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:109)
	  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.sort_addToSorter$(generated.java:32)
	  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(generated.java:41)
	  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$9$$anon$1.hasNext(WholeStageCodegenExec.scala:494)
	  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.findNextInnerJoinRows$(generated.java:42)
	  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(generated.java:101)
	  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$2.hasNext(WholeStageCodegenExec.scala:513)
	  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
	  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828)
	  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828)
	  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	  at org.apache.spark.scheduler.Task.run(Task.scala:109)
	  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	  at java.lang.Thread.run(Thread.java:748)

Rationale

Right now, the codegen from Spark SQL lacks the means to differentiate between a couple of things:

  1. It's hard to tell which physical operators are in the same WholeStageCodegen stage. Note that this "stage" is a separate notion from Spark's RDD execution stages; this one is only to delineate codegen units.
    There can be adjacent physical operators that are both codegen'd but are in separate codegen stages. Some of this is due to hacky implementation details, such as the case with SortMergeJoin and its Sort inputs -- they're hard coded to be split into separate stages although both are codegen'd.
    When printing out the explain output of the physical plan, you'd only see the codegen'd physical operators annotated with a preceding star ('*') but would have no way to figure out if they're in the same stage.

  2. Performance/error diagnosis
    The generated code has class/method names that are hard to differentiate between queries or even between codegen stages within the same query. If we use a Java-level profiler to collect profiles, or if we encounter a Java-level exception with a stack trace in it, it's really hard to tell which part of a query it's at.
    By introducing a per-query codegen stage ID, we'd at least be able to know which codegen stage (and in turn, which group of physical operators) was a profile tick or an exception happened.

The reason why this proposal uses a per-query ID is because it's stable within a query, so that multiple runs of the same query will see the same resulting IDs. This both benefits understandability for users, and also it plays well with the codegen cache in Spark SQL which uses the generated source code as the key.

The downside to using per-query IDs as opposed to a per-session or globally incrementing ID is of course we can't tell apart different query runs with this ID alone. But for now I believe this is a good enough tradeoff.

How was this patch tested?

Existing tests. This PR does not involve any runtime behavior changes other than some name changes.
The SQL query test suites that compares explain outputs have been updates to ignore the newly added codegenStageId.

@rednaxelafx
Copy link
Contributor Author

jenkins retest this please

@rednaxelafx
Copy link
Contributor Author

One comment as to using ThreadLocal[Integer] for keeping track of the IDs: I did have an alternative implementation of this PR that declares WholeStageCodegenExec as:

case class WholeStageCodegenExec(child: SparkPlan)(private val codegenStageId: Int)
    extends UnaryExecNode with CodegenSupport

and then explicitly thread the codegenStageId recursively in CollapseCodegenStages.insertWholeStageCodegen(), so that the relationship between the auto-increment of IDs and the insertion order of WholeStageCodegenExecs are explicit.

However that turned out to be much more complicated than just using a ThreadLocal[Integer] and implicitly threading the IDs. So in the end I opted for the thread-local counter version instead.

@SparkQA
Copy link

SparkQA commented Jan 11, 2018

Test build #85940 has finished for PR 20224 at commit fa25f72.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • final class $generatedClassName extends $

@kiszk
Copy link
Member

kiszk commented Jan 11, 2018

I totally agree to add unique ID to a generated class. This is because all of the generated code by whole-stage codegen has the same class name. It makes us hard to debug in a production environment.

On the other hand, IIUC, the current implementation disables caching mechanism regarding the same query in Spark. To add an unique ID generated different string for Java code. WDYT?

I am thinking about adding an ID related to a task into a comment or other parts.

@maropu
Copy link
Member

maropu commented Jan 11, 2018

We always need to turn on this? It seems this is debug info for developers?

@rednaxelafx
Copy link
Contributor Author

rednaxelafx commented Jan 11, 2018

Thanks for your comments and questions, @kiszk and @maropu !
Let me address them in a couple of separate points.

tl;dr

On top of my original proposal in the PR description / JIRA ticket, I'd like to further add:
a. A config option to choose whether or not to include the codegenStageId in the generated class name. The default should be "off" meaning not including the ID in the class name.
b. To reserve the [0] element in the references array of the WSC generated class as a special value, to record the codegen stage ID. That way, let's say if we need to throw an exception from the generated code, we can include the codegen stage ID when constructing the exception message string. This doesn't add any new IDs to the generated code, so @kiszk 's concerns on codegen cache can be addressed. This can be always turned on.

Side note: Putting the codegen stage ID into the references array solves the codegen cache hit problems perfectly -- this array is Spark SQL's way of expression a "runtime constant pool" anyway. This idea is somewhat similar to how HotSpot VM's "LambdaForm bytecode sharing" works.

Detail Discussions

My proposal and PR currently does 3 things:

  1. Add a per-query codegenStageId to WholeStageCodegenExec;
  2. Include the ID as a part of the explain output for physical plans;
  3. Include the ID as a part of the generated class name for WSC.

Of the above, (1) is the fundamentals, while (2) and (3) are separate applications of using the information from (1).

Would you (@kiszk and @maropu ) agree that at least having both (1) and (2) is a good idea? They don't interact with anything else at runtime, so there so behavioral change or performance implications because of them. They can be always turned on with minimal overhead.

@rxin did point out that our current explain output for physical plans is already pretty cluttered and not user-friendly enough, so it makes sense to have a "verbose mode" in the future and then make the default mode less cluttered. But that's out of scope for this change.

For (3), @kiszk does point out that there's an interaction between the generated code (in source string + comments form) and the codegen cache (from CodeAndComment -> generated class, in org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator#cache). I do know about this cache and have put in considerations of this interaction when I sketched out this proposal.

This PR proposes an ID unique within a query. If the same query is run multiple times, it'll generate the exact same code (with the IDs included), so at least with the current implementation, we can guarantee that there won't be redundant compilation for multiple runs of the same query. I mentioned this in the PR description:

The reason why this proposal uses a per-query ID is because it's stable within a query, so that multiple runs of the same query will see the same resulting IDs. This both benefits understandability for users, and also it plays well with the codegen cache in Spark SQL which uses the generated source code as the key.
This kind of codegen cache hit is fundamental, and this PR keeps it working.

Within a query, though, before this change there could have been cases where there can be codegen stages that happens to have the exact same source code, thus would work well with the codegen cache. After this change, such cases would end up generating code with different IDs embedded into the class name so they'll have different source code, thus won't hit the codegen cache and would have to be compiled separately.

Here's an example that would hit this case:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1)
val df1 = spark.range(5).select('id % 2 as 'x)
val df2 = spark.range(5).select('id % 2 as 'y)
val query = df1.join(df2, 'x === 'y)

With this change, you can see the different codegen stages as follows:

scala> query.explain
== Physical Plan ==
*(5) SortMergeJoin [x#3L], [y#9L], Inner
:- *(2) Sort [x#3L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(x#3L, 200)
:     +- *(1) Project [(id#0L % 2) AS x#3L]
:        +- *(1) Filter isnotnull((id#0L % 2))
:           +- *(1) Range (0, 5, step=1, splits=8)
+- *(4) Sort [y#9L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(y#9L, 200)
      +- *(3) Project [(id#6L % 2) AS y#9L]
         +- *(3) Filter isnotnull((id#6L % 2))
            +- *(3) Range (0, 5, step=1, splits=8)

The generated code for codegen stages (1) and (3) are actually identical, so the codegen cache will save us from redundantly compiling one of the stages.

I would argue that this kind of codegen cache hit is accidental, brittle, and shouldn't be considered a must have.
For example, if df1 was declared as spark.range(3)... instead of spark.range(5)..., the generated code for codegen stages (1) and (3) would have been different (see here for details). In more realistic scenarios, the generated code shape is very sensitive to the surrounding operators and certain constants we directly embed into the code. Such generated code wouldn't reliable hit the codegen cache anyway.

On the other hand, having such IDs in the class name is very useful for all kinds of diagnosis, not just for Spark developers.
e.g. it can tell users where an exception or profile tick happened; it can tell exactly which generated method went above some certain bytecode size threshold, etc.

But to avoid having any regressions whatsoever, I do agree we should have a config option to choose whether or not to embed this codegen stage ID into the generated class name.

WDYT? @kiszk and @maropu ?

@viirya
Copy link
Member

viirya commented Jan 12, 2018

Would you (@kiszk and @maropu ) agree that at least having both (1) and (2) is a good idea?

Without (3), is this still useful if we only have (1) and (2)? It may not much useful if only having the codegen id in explain output.

@rednaxelafx
Copy link
Contributor Author

Thanks for your comments, @viirya !

I'd say only having (1) and (2) makes it much less useful than having all 3, but it's still useful in its own for helping people understand exactly which physical operators were fused into a single codegen stage (as opposed to assuming adjacent codegen'd operators are always in the same codegen stage).
The SortMergeJoin case was something that I really wished we had such an ID readily available in the explain output. I had learned the hacky implementation of SMJ the hard way...

With (3) and the new proposal of reserving references[0] for the codegenStageId, I'm sure it'll be useful for some of your use cases (especially codegen-related development), too. Do you have any use cases off the top of your head, or any suggestions as to whether or not such an ID makes sense in general?

Thanks!

@kiszk
Copy link
Member

kiszk commented Jan 12, 2018

As high level comment, to add IDs helps performance/error diagnosis in production environments. I strongly support to always enable this.
Let me look at technical detail later.

@@ -312,6 +313,24 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp
object WholeStageCodegenExec {
val PIPELINE_DURATION_METRIC = "duration"

private val codegenStageCounter = ThreadLocal.withInitial(new Supplier[Integer] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding these logics into object WholeStageCodegenExec, could we just create a new object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But IMHO this is the place to put it, since it's closely tied to the initialization of WholeStageCodegenExec object instances.
Are you suggesting something like a object WholeStageCodegenId and move the newly added methods there instead? That would work too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Creating a new object WholeStageCodegenId is better.

@gatorsmile
Copy link
Member

Overall, the proposal looks good to me. We need a test case in HiveExplainSuite

insertWholeStageCodegen(plan)
WholeStageCodegenExec.initializeCodegenStageCounterPerQuery()
val newPlan = insertWholeStageCodegen(plan)
WholeStageCodegenExec.resetCodegenStageCounter()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to reset the counter? Is it a bad idea to use just a simple incremental counter like exprId?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's exactly the concern that @kiszk brought up: the codegen cache uses the generated source code as the key, so any differences in the source code text would break the cache hit.
Imagine the counter were a globally (or Spark session-local globally) atomically incrementing, then if the same query were run twice, the codegen stages in those two runs will actually get different sets of IDs, resulting in different source code text (everything's the same except for the ID), and then it'll render the codegen cache useless -- basically nothing will never hit the cache since the ID will always be different.

In fact, you'll find that the IDs in the explain output through df.explain() are going to be different from the ones you see in Spark UI's SQL tab's treeString, because explain is actually "one query with the ExplainCommand as the root". I had hit this exact problem in my early prototype and soon realized this isn't going to be user-friendly.

By making the ID only increment within a query, we can make sure the codegen cache works for multiple runs of the same (or identically structured) query, and still be able to differentiate the codegen stages within a query.

@rednaxelafx
Copy link
Contributor Author

Thanks @gatorsmile ! Will add a new test case in HiveExplainSuite.

@rednaxelafx
Copy link
Contributor Author

rednaxelafx commented Jan 23, 2018

I've updated the PR addressing @gatorsmile 's comments: moved the new utility code to WholeStageCodegenId object and added a new test case in HiveExplainSuite.

I've also added a config flag in SQLConf to choose whether or not to include the codegen stage ID in the generated class name: spark.sql.codegen.wholeStage.useIdInClassName. The default is currently set to false to make it absolutely behave the same as before in terms of codegen cache hit. I'd like to get more suggestion on this one. cc @kiszk

Also added a new test case to WholeStageCodegenSuite make sure that even when the codegen stage ID is included in the generated class name, the same query will still hit the codegen cache.

ping @gatorsmile @kiszk @maropu @viirya to have a second look. Thanks!

@SparkQA
Copy link

SparkQA commented Jan 23, 2018

Test build #86516 has finished for PR 20224 at commit a7ceda2.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • .doc(\"When true, embed the codegen stage ID into the class name of the generated class\")
  • final class $className extends $

@SparkQA
Copy link

SparkQA commented Jan 23, 2018

Test build #86515 has finished for PR 20224 at commit a0162aa.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • .doc(\"When true, embed the codegen stage ID into the class name of the generated class\")
  • final class $className extends $

@rednaxelafx
Copy link
Contributor Author

jenkins retest this please

@SparkQA
Copy link

SparkQA commented Jan 23, 2018

Test build #86521 has finished for PR 20224 at commit a7ceda2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • .doc(\"When true, embed the codegen stage ID into the class name of the generated class\")
  • final class $className extends $

@rednaxelafx
Copy link
Contributor Author

also ping @cloud-fan


spark.range(5).select('id * 2).collect
val after3 = bytecodeSizeHisto.getCount
assert(after3 >= after2, "a different query can result in codegen cache miss, that's okay")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after3 > after2?

Copy link
Contributor Author

@rednaxelafx rednaxelafx Jan 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually deliberately wrote it this way. Note how I phrased in the assertion message as "can result in codegen cache miss" instead of "will result in".

That's because the code shape of this third query was deliberately chosen to be similar to the two queries before it: all three have spark.range(some_const).select(some_expr).collect, so if any future changes to codegen of Range or Project operators affect how much specialized code (such as constant values) we directly embed into the code, it's actually possible for this third query to generate the same code as the first two, which will result in a codegen cache hit -- the bytecode-compiled method count would therefore be the same.

So I'm making this check a bit loose. It's just there to indicate that it's acceptable to for a different query to encounter a codegen cache miss. WYDT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

* is created, e.g. for special fallback handling when an existing WholeStageCodegenExec
* failed to generate/compile code.
*/
val codegenStageId = WholeStageCodegenId.getNextStageId()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my major concern is, what if we transform the physical plan tree after adding WholeStageCodegenExec? e.g. during transformation we may copy a plan code, then a copied WholeStageCodegenExec will have a different stage id.

Copy link
Contributor Author

@rednaxelafx rednaxelafx Jan 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your comment, @cloud-fan ! That's a nice catch that I hadn't really thought about.

All the examples that I've run with are ones that wouldn't trigger changes to the plan after CollapseCodegenStages, which means for those examples ReuseExchange / ReuseSubqueries wouldn't have triggered.
Yes, these two rules could potentially change the physical plan, which means when we transformUp in those rules (and any future rule after CollapseCodegenStages) it'd create new WholeStageCodegenExec objects outside of CollapseCodegenStages, and with my current implementation that'll result in the WSC copies having a codegen stage ID of 0.

One way to workaround this is to move CollapseCodegenStages to always be the last rule in org.apache.spark.sql.execution.QueryExecution#preparations, so that we're sure there's no other transformation on the physical plan that could change the structure of the plan, except for fallback handling that could happen in a couple of doExecute()s -- these exception cases are expected and to me they are acceptable.

If we go down that route, I'll probably have to tweak CollapseCodegenStages a little bit so that it can cope with the physical query plan potentially becoming a DAG instead of a tree, as the ReuseExchange / ReuseSubqueries rules may do that kind of transformation. This tweak is easy to implement and low risk: simply bailing out of the transforming a subtree when it sees a WholeStageCodegenExec already inserted into the plan would suffice.
^^ scratch that. I'll need something a bit more involved to deal with DAGs in this case.

Let me actually update the PR with this tweak and see what happens in tests.

@rednaxelafx
Copy link
Contributor Author

BTW, inspired by @cloud-fan 's comment, here's an example of the codegen stage IDs when scalar subqueries are involved:

val sub = "(select sum(id) from range(5))"
val df = spark.sql(s"select $sub as a, $sub as b")
df.explain(true)

would give:

== Parsed Logical Plan ==
'Project [scalar-subquery#0 [] AS a#1, scalar-subquery#2 [] AS b#3]
:  :- 'Project [unresolvedalias('sum('id), None)]
:  :  +- 'UnresolvedTableValuedFunction range, [5]
:  +- 'Project [unresolvedalias('sum('id), None)]
:     +- 'UnresolvedTableValuedFunction range, [5]
+- OneRowRelation

== Analyzed Logical Plan ==
a: bigint, b: bigint
Project [scalar-subquery#0 [] AS a#1L, scalar-subquery#2 [] AS b#3L]
:  :- Aggregate [sum(id#14L) AS sum(id)#16L]
:  :  +- Range (0, 5, step=1, splits=None)
:  +- Aggregate [sum(id#17L) AS sum(id)#19L]
:     +- Range (0, 5, step=1, splits=None)
+- OneRowRelation

== Optimized Logical Plan ==
Project [scalar-subquery#0 [] AS a#1L, scalar-subquery#2 [] AS b#3L]
:  :- Aggregate [sum(id#14L) AS sum(id)#16L]
:  :  +- Range (0, 5, step=1, splits=None)
:  +- Aggregate [sum(id#17L) AS sum(id)#19L]
:     +- Range (0, 5, step=1, splits=None)
+- OneRowRelation

== Physical Plan ==
*(1) Project [Subquery subquery0 AS a#1L, Subquery subquery2 AS b#3L]
:  :- Subquery subquery0
:  :  +- *(2) HashAggregate(keys=[], functions=[sum(id#14L)], output=[sum(id)#16L])
:  :     +- Exchange SinglePartition
:  :        +- *(1) HashAggregate(keys=[], functions=[partial_sum(id#14L)], output=[sum#21L])
:  :           +- *(1) Range (0, 5, step=1, splits=8)
:  +- Subquery subquery2
:     +- *(2) HashAggregate(keys=[], functions=[sum(id#17L)], output=[sum(id)#19L])
:        +- Exchange SinglePartition
:           +- *(1) HashAggregate(keys=[], functions=[partial_sum(id#17L)], output=[sum#23L])
:              +- *(1) Range (0, 5, step=1, splits=8)
+- Scan OneRowRelation[]

The reason why the IDs look a bit "odd" (that there are three separate codegen stages with ID 1) is because the main "spine" query and each individual subqueries are "planned" separately, thus they'd run CollapseCodegenStages separately, each counting up from 1 afresh. I would consider this behavior acceptable, but I wonder what others would think in this case.
If this behavior for subqueries is not acceptable, I'll have to find alternative places to put the initialization and reset of the thread-local ID counter.

@rednaxelafx
Copy link
Contributor Author

Updated the PR:

  1. addressed @cloud-fan 's comment to make sure the codegenStageId is properly copied in transformations after CollapseCodegenStages. Added a new unit test case for it.

The test case triggers ReuseExchange, which is a rule that runs after CollapseCodegenStages.
Before this update, the explain output for the test query is:

== Physical Plan ==
*(0) Project [id#7L]
+- *(0) SortMergeJoin [id#7L], [id#10L], Inner
   :- *(2) Sort [id#7L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#7L, 200)
   :     +- *(1) Range (0, 100, step=1, splits=8)
   +- *(0) Sort [id#10L ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#10L], Exchange hashpartitioning(id#7L, 200)

Note the *(0)s are indicating that the codegenStageIds are not properly copied. After this update, it is now:

== Physical Plan ==
*(5) Project [id#0L]
+- *(5) SortMergeJoin [id#0L], [id#3L], Inner
   :- *(2) Sort [id#0L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#0L, 200)
   :     +- *(1) Range (0, 100, step=1, splits=8)
   +- *(4) Sort [id#3L ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200)
  1. Flipped the default value of the new conf option "spark.sql.codegen.wholeStage.useIdInClassName" to true.

@SparkQA
Copy link

SparkQA commented Jan 25, 2018

Test build #86612 has finished for PR 20224 at commit e449216.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • .doc(\"When true, embed the codegen stage ID into the class name of the generated class\")
  • case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
  • final class $className extends $

@@ -629,6 +629,13 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val WHOLESTAGE_CODEGEN_USE_ID_IN_CLASS_NAME =
buildConf("spark.sql.codegen.wholeStage.useIdInClassName")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It seems other whole stage codegen configs don't have the wholeStage prefix, shall we remove it to make them consistent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to suggestions for the config option name. Do you have any concrete suggestions?
Looks like you're suggesting spark.sql.codegen.useIdInClassName, right?

I chose the current name (prefix) for two reasons:

  1. the config option right before mine is named spark.sql.codegen.wholeStage, and I just used it as a prefix
  2. this option only affects whole-stage codegen and not other (expression/predicate/ordering/encoder) codegens.

But you're making a good point that all the other whole-stage codegen config options (the ones following this one) only use spark.sql.codegen as the prefix. So if you'd confirm that I understood your suggestion correctly, I'll update the PR to address it.

Thanks!

@@ -325,6 +326,28 @@ object WholeStageCodegenExec {
}
}

object WholeStageCodegenId {
private val codegenStageCounter = ThreadLocal.withInitial(new Supplier[Integer] {
override def get() = 0 // TODO: change to Scala lambda syntax when upgraded to Scala 2.12+
Copy link
Contributor

@cloud-fan cloud-fan Jan 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we just use 1 as initial value and add a comment to say that 0 is preserved for temporary WholeStageCodegenExec objects? Then we only need a initialize method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the updated PR that uses the secondary constructor in WholeStageCodegenExec, yes you're making a good point. All the places that create temporary WholeStageCodegenExec objects are explicitly passing in 0 as the codegen stage ID now, so we can indeed simplify the counter logic here.

Will address in the next update.

@cloud-fan
Copy link
Contributor

LGTM

@cloud-fan
Copy link
Contributor

LGTM, pending jenkins


spark.range(5).select('id * 2).collect
val after3 = bytecodeSizeHisto.getCount
assert(after3 >= after2, "a different query can result in codegen cache miss, that's okay")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a different query can result in codegen cache miss, that's okay seems a misleading error message for the assert.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That assert is actually "useless" in the sense that the CodegenMetrics are always monotonically increasing, so after3 >= after2 will always be true (note the = in there). I only put it in to show the intent that it's by design that a different query can cause codegen cache miss.

Would you have any concrete suggestions in wording that assertion message? I'm totally open to suggestions here. I can move the current message into a comment and make the assert message look more like an assert message

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is an error message so it's weird to use it to explain this assert. I think you can make it as a comment to explain the assert and remove the error message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok seems this assert is not very useful, how about we just remove it? We can just add a comment before assert(after1 == after2, "the same query run twice should hit the codegen cache") and say that different queries can cause codegen cache miss, so this assert proves the query is same?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestions, @viirya and @cloud-fan ! I'll move the assert message to a comment in the next update.

// 0 is reserved as a special ID value to indicate a temporary WholeStageCodegenExec object
// is created, e.g. for special fallback handling when an existing WholeStageCodegenExec
// failed to generate/compile code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should describe about the usage of such codegen stage id, e.g., the codegen stage id would show up in explain string and generated class name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing. Will address it in the next update. Thanks!

.doc("When true, embed the (whole-stage) codegen stage ID into " +
"the class name of the generated class as a suffix")
.booleanConf
.createWithDefault(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we disable codegen stage id in both explain result and generated class name at the same time? It seems not be useful if we disable it in class name but keep it in explain result.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's always good to have id in explain and generated classes. The only concern is we may have codegen cache issues if putting id in the class name, so we need a config to turn it off.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense to me.

@SparkQA
Copy link

SparkQA commented Jan 25, 2018

Test build #86627 has finished for PR 20224 at commit a11232e.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
  • final class $className extends $

@viirya
Copy link
Member

viirya commented Jan 25, 2018

retest this please.

@viirya
Copy link
Member

viirya commented Jan 25, 2018

LGTM with minor comments.

@rednaxelafx
Copy link
Contributor Author

Updated again. Addressed @viirya 's comments:

  1. added comments to explain where this codegen stage ID is used
  2. moved an assertion message to a comment.

// a different query can result in codegen cache miss, that's by design
spark.range(5).select('id * 2).collect
val after3 = bytecodeSizeHisto.getCount
assert(after3 >= after2, "always")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I think we can just remove the above 3 lines and add a comment before L258, to say that the CodegenMetrics are always monotonically increasing, so after1 == after2 proves we hit the codegen cache.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that. Updating now.

@rednaxelafx
Copy link
Contributor Author

Updated again to address @cloud-fan 's comments: removed unneeded test case and added a bit more comments.

@SparkQA
Copy link

SparkQA commented Jan 25, 2018

Test build #86631 has finished for PR 20224 at commit a11232e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
  • final class $className extends $

@SparkQA
Copy link

SparkQA commented Jan 25, 2018

Test build #86632 has finished for PR 20224 at commit a11232e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
  • final class $className extends $

@SparkQA
Copy link

SparkQA commented Jan 25, 2018

Test build #86636 has finished for PR 20224 at commit 5c99777.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
  • final class $className extends $

@SparkQA
Copy link

SparkQA commented Jan 25, 2018

Test build #86638 has finished for PR 20224 at commit ce8171a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
  • final class $className extends $

}

${ctx.registerComment(s"""Codegend pipeline for\n${child.treeString.trim}""")}
final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
final class $className extends ${classOf[BufferedRowIterator].getName} {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we always add codegenStageId as a comment by using ctx.registerComment()?

Copy link
Contributor Author

@rednaxelafx rednaxelafx Jan 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's a great idea that I missed! Thanks for your comments, @kiszk !

We can get that effect in two ways:

  1. Change the current line ${ctx.registerComment(s"""Codegend pipeline for\n${child.treeString.trim}""")} from child.treeString to this.treeString, which will include the codegen stage ID through the treeString, just like the explain output.
  2. Simply add $codegenStageId into the Codegend pipeline for line.
  3. Do both above. (Did I say two...?)

Which do you prefer?

$ git diff
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index b0090af77e..0e525b1e22 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -538,7 +538,9 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
         return new $className(references);
       }
 
-      ${ctx.registerComment(s"""Codegend pipeline for\n${child.treeString.trim}""")}
+      ${ctx.registerComment(
+        s"""Codegend pipeline for stage (id=$codegenStageId)
+           |${this.treeString.trim}""".stripMargin)}
       final class $className extends ${classOf[BufferedRowIterator].getName} {
 
         private Object[] references;

An example generated code with comments enabled is:

/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage2(references);
/* 003 */ }
/* 004 */
/* 005 */ /**
 * Codegend pipeline for stage (id=2)
 * *(2) Project [(id#0L + 1) AS x#4L]
 * +- *(2) Sort [id#0L ASC NULLS FIRST], true, 0
 *    +- Exchange rangepartitioning(id#0L ASC NULLS FIRST, 200)
 *       +- *(1) Range (0, 1, step=1, splits=8)
 */
/* 006 */ final class GeneratedIteratorForCodegenStage2 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank for your changes. They look very good.

I missed one fact (I was sleepy :)). ctx.registerComment() is enabled only when spark.sql.codegen.comments is true. It would be good to add the id in the comment regardless of spark.sql.codegen.comments since this comment is very small.
I could create a follow-up PR this afternoon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I like that idea too. Please ping me on the follow-up PR as well. Thanks!

@SparkQA
Copy link

SparkQA commented Jan 25, 2018

Test build #86655 has finished for PR 20224 at commit b2e2cb0.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
  • final class $className extends $

Also added a new test case to HiveExplainSuite to make sure the codegen stage ID is indeed included in the explain output of the physical plan,
and another new test case in WholeStageCodegenSuite to make sure with the ID included into the generated class name, the generated code can still hit the codegen cache for the same query.
@asfgit asfgit closed this in e57f394 Jan 26, 2018
asfgit pushed a commit that referenced this pull request Jan 26, 2018
…nExec

## What changes were proposed in this pull request?

**Proposal**

Add a per-query ID to the codegen stages as represented by `WholeStageCodegenExec` operators. This ID will be used in
-  the explain output of the physical plan, and in
- the generated class name.

Specifically, this ID will be stable within a query, counting up from 1 in depth-first post-order for all the `WholeStageCodegenExec` inserted into a plan.
The ID value 0 is reserved for "free-floating" `WholeStageCodegenExec` objects, which may have been created for one-off purposes, e.g. for fallback handling of codegen stages that failed to codegen the whole stage and wishes to codegen a subset of the children operators (as seen in `org.apache.spark.sql.execution.FileSourceScanExec#doExecute`).

Example: for the following query:
```scala
scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1)

scala> val df1 = spark.range(10).select('id as 'x, 'id + 1 as 'y).orderBy('x).select('x + 1 as 'z, 'y)
df1: org.apache.spark.sql.DataFrame = [z: bigint, y: bigint]

scala> val df2 = spark.range(5)
df2: org.apache.spark.sql.Dataset[Long] = [id: bigint]

scala> val query = df1.join(df2, 'z === 'id)
query: org.apache.spark.sql.DataFrame = [z: bigint, y: bigint ... 1 more field]
```

The explain output before the change is:
```scala
scala> query.explain
== Physical Plan ==
*SortMergeJoin [z#9L], [id#13L], Inner
:- *Sort [z#9L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(z#9L, 200)
:     +- *Project [(x#3L + 1) AS z#9L, y#4L]
:        +- *Sort [x#3L ASC NULLS FIRST], true, 0
:           +- Exchange rangepartitioning(x#3L ASC NULLS FIRST, 200)
:              +- *Project [id#0L AS x#3L, (id#0L + 1) AS y#4L]
:                 +- *Range (0, 10, step=1, splits=8)
+- *Sort [id#13L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#13L, 200)
      +- *Range (0, 5, step=1, splits=8)
```
Note how codegen'd operators are annotated with a prefix `"*"`. See how the `SortMergeJoin` operator and its direct children `Sort` operators are adjacent and all annotated with the `"*"`, so it's hard to tell they're actually in separate codegen stages.

and after this change it'll be:
```scala
scala> query.explain
== Physical Plan ==
*(6) SortMergeJoin [z#9L], [id#13L], Inner
:- *(3) Sort [z#9L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(z#9L, 200)
:     +- *(2) Project [(x#3L + 1) AS z#9L, y#4L]
:        +- *(2) Sort [x#3L ASC NULLS FIRST], true, 0
:           +- Exchange rangepartitioning(x#3L ASC NULLS FIRST, 200)
:              +- *(1) Project [id#0L AS x#3L, (id#0L + 1) AS y#4L]
:                 +- *(1) Range (0, 10, step=1, splits=8)
+- *(5) Sort [id#13L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#13L, 200)
      +- *(4) Range (0, 5, step=1, splits=8)
```
Note that the annotated prefix becomes `"*(id) "`. See how the `SortMergeJoin` operator and its direct children `Sort` operators have different codegen stage IDs.

It'll also show up in the name of the generated class, as a suffix in the format of `GeneratedClass$GeneratedIterator$id`.

For example, note how `GeneratedClass$GeneratedIteratorForCodegenStage3` and `GeneratedClass$GeneratedIteratorForCodegenStage6` in the following stack trace corresponds to the IDs shown in the explain output above:
```
"Executor task launch worker for task 42412957" daemon prio=5 tid=0x58 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:109)
	  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.sort_addToSorter$(generated.java:32)
	  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(generated.java:41)
	  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$9$$anon$1.hasNext(WholeStageCodegenExec.scala:494)
	  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.findNextInnerJoinRows$(generated.java:42)
	  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(generated.java:101)
	  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$2.hasNext(WholeStageCodegenExec.scala:513)
	  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
	  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828)
	  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828)
	  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	  at org.apache.spark.scheduler.Task.run(Task.scala:109)
	  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	  at java.lang.Thread.run(Thread.java:748)
```

**Rationale**

Right now, the codegen from Spark SQL lacks the means to differentiate between a couple of things:

1. It's hard to tell which physical operators are in the same WholeStageCodegen stage. Note that this "stage" is a separate notion from Spark's RDD execution stages; this one is only to delineate codegen units.
There can be adjacent physical operators that are both codegen'd but are in separate codegen stages. Some of this is due to hacky implementation details, such as the case with `SortMergeJoin` and its `Sort` inputs -- they're hard coded to be split into separate stages although both are codegen'd.
When printing out the explain output of the physical plan, you'd only see the codegen'd physical operators annotated with a preceding star (`'*'`) but would have no way to figure out if they're in the same stage.

2. Performance/error diagnosis
The generated code has class/method names that are hard to differentiate between queries or even between codegen stages within the same query. If we use a Java-level profiler to collect profiles, or if we encounter a Java-level exception with a stack trace in it, it's really hard to tell which part of a query it's at.
By introducing a per-query codegen stage ID, we'd at least be able to know which codegen stage (and in turn, which group of physical operators) was a profile tick or an exception happened.

The reason why this proposal uses a per-query ID is because it's stable within a query, so that multiple runs of the same query will see the same resulting IDs. This both benefits understandability for users, and also it plays well with the codegen cache in Spark SQL which uses the generated source code as the key.

The downside to using per-query IDs as opposed to a per-session or globally incrementing ID is of course we can't tell apart different query runs with this ID alone. But for now I believe this is a good enough tradeoff.

## How was this patch tested?

Existing tests. This PR does not involve any runtime behavior changes other than some name changes.
The SQL query test suites that compares explain outputs have been updates to ignore the newly added `codegenStageId`.

Author: Kris Mok <kris.mok@databricks.com>

Closes #20224 from rednaxelafx/wsc-codegenstageid.

(cherry picked from commit e57f394)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
@SparkQA
Copy link

SparkQA commented Jan 26, 2018

Test build #86660 has finished for PR 20224 at commit fd8983e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
  • final class $className extends $

@gatorsmile
Copy link
Member

Thanks! Merged to master/2.3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants