-
Notifications
You must be signed in to change notification settings - Fork 28.5k
[SPARK-31399][CORE][test-hadoop3.2][test-java11] Support indylambda Scala closure in ClosureCleaner #28463
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-31399][CORE][test-hadoop3.2][test-java11] Support indylambda Scala closure in ClosureCleaner #28463
Conversation
Working on a new commit that'll add some new test cases for Scala closures declared in Scala REPLs. In the meantime, any comments on the fix itself is very appreciated! |
} | ||
|
||
def inspect(closure: AnyRef): SerializedLambda = { | ||
val writeReplace = closure.getClass.getDeclaredMethod("writeReplace") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this create a copy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a copy, but a "serialization proxy" object that holds on to the symbolic information of the original closure object.
The original closure object contains too much runtime-specific information (e.g. class loader info, reference to runtime-generated lambda class) that's not suitable for serialization. So the JDK uses a "serialization proxy" that only holds on to the symbolic info for serialization purposes.
Spark's ClosureCleaner
abuses this serialization proxy for introspection into what's inside the black box.
|
||
val visited = Set[MethodIdentifier[_]](implMethodId) | ||
val stack = Stack[MethodIdentifier[_]](implMethodId) | ||
while (!stack.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks promising. I remember that the original work to get this working on 2.12 just kind of punted on indylambdas as it mostly Just Worked, but that's an important case that hasn't worked.
Test build #122360 has finished for PR 28463 at commit
|
Test build #122359 has finished for PR 28463 at commit
|
I've added a test case to |
* | ||
* @param maybeClosure the closure to check. | ||
*/ | ||
def getSerializationProxy(maybeClosure: AnyRef): Option[SerializedLambda] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is moved from ClosureCleaner.getSerializedLambda
with some enhancements.
// && implements a scala.runtime.java8 functional interface | ||
} | ||
|
||
def inspect(closure: AnyRef): SerializedLambda = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is moved from ClosureCleaner.inspect
, verbatim
cc @zsxwing |
cc @retronym and @skonto for Scala compiler expertise input. |
Test build #122364 has finished for PR 28463 at commit
|
One question about test strategy. Is it better to test old style lambda (i.e. w/o indy) against the new closure cleaner, too? In other words, does |
Thank you for the comment, @kiszk san!
The old/new style closure checks ( That said, it's a good suggestion to have some tests to explicitly the old style lambdas. It's possible to use them in Scala 2.12 by specifying It could be a bit of hassle to setup a test suite with a exotic REPL option. At least that can't go to the |
Test build #122373 has finished for PR 28463 at commit
|
// should be something cleanable, i.e. a Scala REPL line object | ||
val needsCleaning = isClosureDeclaredInScalaRepl && | ||
outerThisOpt.isDefined && outerThisOpt.get.getClass.getName == capturingClassName |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isClosureDeclaredInScalaRepl && ...
means only Scala REPL line object needs cleaning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It means only Scala closures are supported. Spark's ClosureCleaner
was never capable of cleaning non-Scala closures, e.g. Java 8's lambdas, because the specifics of how captures work are different, among various reasons. My new code just makes it explicit in the naming to make sure there's no confusion that this is only geared towards Scala.
if (op == GETFIELD || op == PUTFIELD) { | ||
val ownerExternalName = owner.replace('/', '.') | ||
for (cl <- accessedFields.keys if cl.getName == ownerExternalName) { | ||
logTrace(s" found field access $name on $owner") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will logging ownerExternalName
be more readable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed! Thanks, I'll address this in an update.
val implMethodId = MethodIdentifier( | ||
implClass, lambdaProxy.getImplMethodName, lambdaProxy.getImplMethodSignature) | ||
|
||
val visited = Set[MethodIdentifier[_]](implMethodId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
visited
is not used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh shoot, I forgot to add the uses..... thanks for catching this!
It should be added right after the val currentId = stack.pop
line. The initialization line should probably be empty instead of pre-populating with the implMethodId
, for consistency.
if (!maybeClosureClass.isSynthetic) return None | ||
|
||
val implementedInterfaces = ClassUtils.getAllInterfaces(maybeClosureClass).asScala | ||
val isClosureCandidate = implementedInterfaces.exists(_.getName == "scala.Serializable") && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This detail has changed in Scala 2.13, in which scala.Serializable
has become a type alias for java.io.Serializable
. You could safely add that check in here already to future proof things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much! This is very useful information. Does that mean I should check for maybeClosure.isInstanceOf[Serializable]
here instead of bothering with explicitly checking the name scala.Serializable
? Assuming we're running on a JVM instead of running in a no-Java environment, this instanceof check should always be true for Scala closures, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, that should work fine.
Scala lambdas can also implement other functional interfaces (such as java.util.function.Supplier
) when used in a context that expects one. I guess that's out of scope for ClosureCleaner
, though.
Brainstorming a little. An alternative design of Pros:
Cons:
Okay, just wanted to put that out there. In case you're not aware, there is a handy secret handshake in the REPL to show the wrappers. Add
We already have Spark-contributed code that inserts the temporary vals like We also have a have an extra post-REPL compiler phases that does some a countervailing cleanup to remove these temp vals when the import that gave rise to them doesn't actually contribute terms to the code. (A proper fix for SPARK-5150 ) I'm wondering if the compiler also ought to bend the rules a little bit and force capture-by-copy of
Capturing early would leak the uninitialized field, but the expression ought not be valid the REPL because it has a forward reference. The REPL doesn't emit the "invalid forward reference" error for So with a small change to the compiler we could make the new test pass without changes to |
/** | ||
* Scans an indylambda Scala closure, along with its lexically nested closures, and populate | ||
* the accessed fields info on which fields on the outer object are accessed. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think an code snippet here together then the resulting logTrace
output would be really useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a very good suggestion. I was intending to write more explanation in the comments but haven't decided on how.
8959f7e
to
baeba8b
Compare
// so we check first | ||
// non LMF-closures should be less frequent from now on | ||
val lambdaFunc = getSerializedLambda(func) | ||
val maybeIndylambdaProxy = IndylambdaScalaClosures.getSerializationProxy(func) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @rednaxelafx . Do we need to change this PR for Scala 2.11 when we backport into branch-2.4
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never mind. I found your previous comment (#28463 (comment)).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, the new IndylambdaScalaClosures.getSerializationProxy
is pretty much the same as the old getSerializedLambda
, just with a few more checks.
It's supposed to return None
for Scala 2.11 closures, and Some(...)
for Scala 2.12+ closures, no changes.
Alternatively, you can dynamically change (most) compiler settings in the REPL.
|
Test build #122399 has finished for PR 28463 at commit
|
baeba8b
to
c15af43
Compare
retest this please |
Test build #122726 has finished for PR 28463 at commit
|
### What changes were proposed in this pull request? It's quite annoying to be blocked by flaky tests in several PRs. This PR disables them. The tests come from 3 PRs I'm recently watching: #28526 #28463 #28517 ### Why are the changes needed? To make PR builder more stable ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? N/A Closes #28547 from cloud-fan/test. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? It's quite annoying to be blocked by flaky tests in several PRs. This PR disables them. The tests come from 3 PRs I'm recently watching: #28526 #28463 #28517 ### Why are the changes needed? To make PR builder more stable ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? N/A Closes #28547 from cloud-fan/test. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 2012d58) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Test build #5045 has finished for PR 28463 at commit
|
Test build #5042 has finished for PR 28463 at commit
|
Test build #5043 has finished for PR 28463 at commit
|
Test build #5044 has finished for PR 28463 at commit
|
retest this please |
Test build #122754 has finished for PR 28463 at commit
|
// This this the InnerClassFinder equivalent for inner classes, which still use the | ||
// `$outer` chain. So this is NOT controlled by the `findTransitively` flag. | ||
logDebug(s" found inner class $ownerExternalName") | ||
val innerClassInfo = getOrUpdateClassInfo(owner) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: How about val (innerClass, innerClassNode) = ...
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the suggestion. Yes this was intentional. Using the destructuring pattern match syntax somehow triggers a problem in Scala compiler's type inferencer, that requires importing scala.lang.existential to resolve. I'd rather write the code in a slightly more tedious fashion than importing that...
retest this please |
Test build #122768 has finished for PR 28463 at commit
|
retest this please |
Test build #122778 has finished for PR 28463 at commit
|
retest this please |
2 similar comments
retest this please |
retest this please |
Test build #122780 has finished for PR 28463 at commit
|
Test build #122781 has finished for PR 28463 at commit
|
We don't have many critical changes after the last success build: #28463 (comment) The failed flaky tests are unrelated to this PR, and we need to unblock 3.0 ASAP. I'm merging it first, will monitor the jenkins builds later. Thanks! |
…cala closure in ClosureCleaner ### What changes were proposed in this pull request? This PR proposes to enhance Spark's `ClosureCleaner` to support "indylambda" style of Scala closures to the same level as the existing implementation for the old (inner class) style ones. The goal is to reach feature parity with the support of the old style Scala closures, with as close to bug-for-bug compatibility as possible. Specifically, this PR addresses one lacking support for indylambda closures vs the inner class closures: - When a closure is declared in a Scala REPL and captures the enclosing REPL line object, such closure should be cleanable (unreferenced fields on the enclosing REPL line object should be cleaned) This PR maintains the same limitations in the new indylambda closure support as the old inner class closures, in particular the following two: - Cleaning is only available for one level of REPL line object. If a closure captures state from a REPL line object further out from the immediate enclosing one, it won't be subject to cleaning. See example below. - "Sibling" closures are not handled yet. A "sibling" closure is defined here as a closure that is directly or indirectly referenced by the starting closure, but isn't lexically enclosing. e.g. ```scala { val siblingClosure = (x: Int) => x + this.fieldA // captures `this`, references `fieldA` on `this`. val startingClosure = (y: Int) => y + this.fieldB + siblingClosure(y) // captures `this` and `siblingClosure`, references `fieldB` on `this`. } ``` The changes are intended to be minimal, with further code cleanups planned in separate PRs. Jargons: - old, inner class style Scala closures, aka `delambdafy:inline`: default in Scala 2.11 and before - new, "indylambda" style Scala closures, aka `delambdafy:method`: default in Scala 2.12 and later ### Why are the changes needed? There had been previous effortsto extend Spark's `ClosureCleaner` to support "indylambda" Scala closures, which is necessary for proper Scala 2.12 support. Most notably the work done for [SPARK-14540](https://issues.apache.org/jira/browse/SPARK-14540). But the previous efforts had missed one import scenario: a Scala closure declared in a Scala REPL, and it captures the enclosing `this` -- a REPL line object. e.g. in a Spark Shell: ```scala :pa class NotSerializableClass(val x: Int) val ns = new NotSerializableClass(42) val topLevelValue = "someValue" val func = (j: Int) => { (1 to j).flatMap { x => (1 to x).map { y => y + topLevelValue } } } <Ctrl+D> sc.parallelize(0 to 2).map(func).collect ``` In this example, `func` refers to a Scala closure that captures the enclosing `this` because it needs to access `topLevelValue`, which is in turn implemented as a field on the enclosing REPL line object. The existing `ClosureCleaner` in Spark supports cleaning this case in Scala 2.11-, and this PR brings feature parity to Scala 2.12+. Note that the existing cleaning logic only supported one level of REPL line object nesting. This PR does not go beyond that. When a closure references state declared a few commands earlier, the cleaning will fail in both Scala 2.11 and Scala 2.12. e.g. ```scala scala> :pa // Entering paste mode (ctrl-D to finish) class NotSerializableClass1(val x: Int) case class Foo(id: String) val ns = new NotSerializableClass1(42) val topLevelValue = "someValue" // Exiting paste mode, now interpreting. defined class NotSerializableClass1 defined class Foo ns: NotSerializableClass1 = NotSerializableClass1615b1baf topLevelValue: String = someValue scala> :pa // Entering paste mode (ctrl-D to finish) val closure2 = (j: Int) => { (1 to j).flatMap { x => (1 to x).map { y => y + topLevelValue } // 2 levels } } // Exiting paste mode, now interpreting. closure2: Int => scala.collection.immutable.IndexedSeq[String] = <function1> scala> sc.parallelize(0 to 2).map(closure2).collect org.apache.spark.SparkException: Task not serializable ... ``` in the Scala 2.11 / Spark 2.4.x case: ``` Caused by: java.io.NotSerializableException: NotSerializableClass1 Serialization stack: - object not serializable (class: NotSerializableClass1, value: NotSerializableClass1615b1baf) - field (class: $iw, name: ns, type: class NotSerializableClass1) - object (class $iw, $iw64df3f4b) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw66e6e5e9) - field (class: $line14.$read, name: $iw, type: class $iw) - object (class $line14.$read, $line14.$readc310aa3) - field (class: $iw, name: $line14$read, type: class $line14.$read) - object (class $iw, $iw79224636) - field (class: $iw, name: $outer, type: class $iw) - object (class $iw, $iw636d4cdc) - field (class: $anonfun$1, name: $outer, type: class $iw) - object (class $anonfun$1, <function1>) ``` in the Scala 2.12 / Spark master case after this PR: ``` Caused by: java.io.NotSerializableException: NotSerializableClass1 Serialization stack: - object not serializable (class: NotSerializableClass1, value: NotSerializableClass16f3b4c9a) - field (class: $iw, name: ns, type: class NotSerializableClass1) - object (class $iw, $iw2945a3c1) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw152705d0) - field (class: $line14.$read, name: $iw, type: class $iw) - object (class $line14.$read, $line14.$read7cf311eb) - field (class: $iw, name: $line14$read, type: class $line14.$read) - object (class $iw, $iwd980dac) - field (class: $iw, name: $outer, type: class $iw) - object (class $iw, $iw557d9532) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic $anonfun$closure2$1$adapted:(L$iw;Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;, instantiatedMethodType=(Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;, numCaptured=1]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class $Lambda$2103/815179920, $Lambda$2103/815179920569b57c4) ``` For more background of the new and old ways Scala lowers closures to Java bytecode, please see [A note on how NSC (New Scala Compiler) lowers lambdas](https://gist.github.com/rednaxelafx/e9ecd09bbd1c448dbddad4f4edf25d48#file-notes-md). For more background on how Spark's `ClosureCleaner` works and what's needed to make it support "indylambda" Scala closures, please refer to [A Note on Apache Spark's ClosureCleaner](https://gist.github.com/rednaxelafx/e9ecd09bbd1c448dbddad4f4edf25d48#file-spark_closurecleaner_notes-md). #### tl;dr The `ClosureCleaner` works like a mark-sweep algorithm on fields: - Finding (a chain of) outer objects referenced by the starting closure; - Scanning the starting closure and its inner closures and marking the fields on the outer objects accessed; - Cloning the outer objects, nulling out fields that are not accessed by any closure of concern. ##### Outer Objects For the old, inner class style Scala closures, the "outer objects" is defined as the lexically enclosing closures of the starting closure, plus an optional enclosing REPL line object if these closures are defined in a Scala REPL. All of them are on a singly-linked `$outer` chain. For the new, "indylambda" style Scala closures, the capturing implementation changed, so closures no longer refer to their enclosing closures via an `$outer` chain. However, a closure can still capture its enclosing REPL line object, much like the old style closures. The name of the field that captures this reference would be `arg$1` (instead of `$outer`). So what's missing in the `ClosureCleaner` for the "indylambda" support is find and potentially clone+clean the captured enclosing `this` REPL line object. That's what this PR implements. ##### Inner Closures The old, inner class style of Scala closures are compiled into separate inner classes, one per lambda body. So in order to discover the implementation (bytecode) of the inner closures, one has to jump over multiple classes. The name of such a class would contain the marker substring `$anonfun$`. The new, "indylambda" style Scala closures are compiled into **static methods** in the class where the lambdas were declared. So for lexically nested closures, their lambda bodies would all be compiled into static methods **in the same class**. This makes it much easier to discover the implementation (bytecode) of the nested lambda bodies. The name of such a static method would contain the marker substring `$anonfun$`. Discovery of inner closures involves scanning bytecode for certain patterns that represent the creation of a closure object for the inner closure. - For inner class style: the closure object creation site is like `new <InnerClassForTheClosure>(captured args)` - For "indylambda" style: the closure object creation site would be compiled into an `invokedynamic` instruction, with its "bootstrap method" pointing to the same one used by Java 8 for its serializable lambdas, and with the bootstrap method arguments pointing to the implementation method. ### Does this PR introduce _any_ user-facing change? Yes. Before this PR, Spark 2.4 / 3.0 / master on Scala 2.12 would not support Scala closures declared in a Scala REPL that captures anything from the REPL line objects. After this PR, such scenario is supported. ### How was this patch tested? Added new unit test case to `org.apache.spark.repl.SingletonReplSuite`. The new test case fails without the fix in this PR, and pases with the fix. Closes #28463 from rednaxelafx/closure-cleaner-indylambda. Authored-by: Kris Mok <kris.mok@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit dc01b75) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Test FAILed. |
|
…cala closure in ClosureCleaner This PR proposes to enhance Spark's `ClosureCleaner` to support "indylambda" style of Scala closures to the same level as the existing implementation for the old (inner class) style ones. The goal is to reach feature parity with the support of the old style Scala closures, with as close to bug-for-bug compatibility as possible. Specifically, this PR addresses one lacking support for indylambda closures vs the inner class closures: - When a closure is declared in a Scala REPL and captures the enclosing REPL line object, such closure should be cleanable (unreferenced fields on the enclosing REPL line object should be cleaned) This PR maintains the same limitations in the new indylambda closure support as the old inner class closures, in particular the following two: - Cleaning is only available for one level of REPL line object. If a closure captures state from a REPL line object further out from the immediate enclosing one, it won't be subject to cleaning. See example below. - "Sibling" closures are not handled yet. A "sibling" closure is defined here as a closure that is directly or indirectly referenced by the starting closure, but isn't lexically enclosing. e.g. ```scala { val siblingClosure = (x: Int) => x + this.fieldA // captures `this`, references `fieldA` on `this`. val startingClosure = (y: Int) => y + this.fieldB + siblingClosure(y) // captures `this` and `siblingClosure`, references `fieldB` on `this`. } ``` The changes are intended to be minimal, with further code cleanups planned in separate PRs. Jargons: - old, inner class style Scala closures, aka `delambdafy:inline`: default in Scala 2.11 and before - new, "indylambda" style Scala closures, aka `delambdafy:method`: default in Scala 2.12 and later There had been previous effortsto extend Spark's `ClosureCleaner` to support "indylambda" Scala closures, which is necessary for proper Scala 2.12 support. Most notably the work done for [SPARK-14540](https://issues.apache.org/jira/browse/SPARK-14540). But the previous efforts had missed one import scenario: a Scala closure declared in a Scala REPL, and it captures the enclosing `this` -- a REPL line object. e.g. in a Spark Shell: ```scala :pa class NotSerializableClass(val x: Int) val ns = new NotSerializableClass(42) val topLevelValue = "someValue" val func = (j: Int) => { (1 to j).flatMap { x => (1 to x).map { y => y + topLevelValue } } } <Ctrl+D> sc.parallelize(0 to 2).map(func).collect ``` In this example, `func` refers to a Scala closure that captures the enclosing `this` because it needs to access `topLevelValue`, which is in turn implemented as a field on the enclosing REPL line object. The existing `ClosureCleaner` in Spark supports cleaning this case in Scala 2.11-, and this PR brings feature parity to Scala 2.12+. Note that the existing cleaning logic only supported one level of REPL line object nesting. This PR does not go beyond that. When a closure references state declared a few commands earlier, the cleaning will fail in both Scala 2.11 and Scala 2.12. e.g. ```scala scala> :pa // Entering paste mode (ctrl-D to finish) class NotSerializableClass1(val x: Int) case class Foo(id: String) val ns = new NotSerializableClass1(42) val topLevelValue = "someValue" // Exiting paste mode, now interpreting. defined class NotSerializableClass1 defined class Foo ns: NotSerializableClass1 = NotSerializableClass1615b1baf topLevelValue: String = someValue scala> :pa // Entering paste mode (ctrl-D to finish) val closure2 = (j: Int) => { (1 to j).flatMap { x => (1 to x).map { y => y + topLevelValue } // 2 levels } } // Exiting paste mode, now interpreting. closure2: Int => scala.collection.immutable.IndexedSeq[String] = <function1> scala> sc.parallelize(0 to 2).map(closure2).collect org.apache.spark.SparkException: Task not serializable ... ``` in the Scala 2.11 / Spark 2.4.x case: ``` Caused by: java.io.NotSerializableException: NotSerializableClass1 Serialization stack: - object not serializable (class: NotSerializableClass1, value: NotSerializableClass1615b1baf) - field (class: $iw, name: ns, type: class NotSerializableClass1) - object (class $iw, $iw64df3f4b) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw66e6e5e9) - field (class: $line14.$read, name: $iw, type: class $iw) - object (class $line14.$read, $line14.$readc310aa3) - field (class: $iw, name: $line14$read, type: class $line14.$read) - object (class $iw, $iw79224636) - field (class: $iw, name: $outer, type: class $iw) - object (class $iw, $iw636d4cdc) - field (class: $anonfun$1, name: $outer, type: class $iw) - object (class $anonfun$1, <function1>) ``` in the Scala 2.12 / Spark master case after this PR: ``` Caused by: java.io.NotSerializableException: NotSerializableClass1 Serialization stack: - object not serializable (class: NotSerializableClass1, value: NotSerializableClass16f3b4c9a) - field (class: $iw, name: ns, type: class NotSerializableClass1) - object (class $iw, $iw2945a3c1) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw152705d0) - field (class: $line14.$read, name: $iw, type: class $iw) - object (class $line14.$read, $line14.$read7cf311eb) - field (class: $iw, name: $line14$read, type: class $line14.$read) - object (class $iw, $iwd980dac) - field (class: $iw, name: $outer, type: class $iw) - object (class $iw, $iw557d9532) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic $anonfun$closure2$1$adapted:(L$iw;Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;, instantiatedMethodType=(Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;, numCaptured=1]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class $Lambda$2103/815179920, $Lambda$2103/815179920569b57c4) ``` For more background of the new and old ways Scala lowers closures to Java bytecode, please see [A note on how NSC (New Scala Compiler) lowers lambdas](https://gist.github.com/rednaxelafx/e9ecd09bbd1c448dbddad4f4edf25d48#file-notes-md). For more background on how Spark's `ClosureCleaner` works and what's needed to make it support "indylambda" Scala closures, please refer to [A Note on Apache Spark's ClosureCleaner](https://gist.github.com/rednaxelafx/e9ecd09bbd1c448dbddad4f4edf25d48#file-spark_closurecleaner_notes-md). The `ClosureCleaner` works like a mark-sweep algorithm on fields: - Finding (a chain of) outer objects referenced by the starting closure; - Scanning the starting closure and its inner closures and marking the fields on the outer objects accessed; - Cloning the outer objects, nulling out fields that are not accessed by any closure of concern. For the old, inner class style Scala closures, the "outer objects" is defined as the lexically enclosing closures of the starting closure, plus an optional enclosing REPL line object if these closures are defined in a Scala REPL. All of them are on a singly-linked `$outer` chain. For the new, "indylambda" style Scala closures, the capturing implementation changed, so closures no longer refer to their enclosing closures via an `$outer` chain. However, a closure can still capture its enclosing REPL line object, much like the old style closures. The name of the field that captures this reference would be `arg$1` (instead of `$outer`). So what's missing in the `ClosureCleaner` for the "indylambda" support is find and potentially clone+clean the captured enclosing `this` REPL line object. That's what this PR implements. The old, inner class style of Scala closures are compiled into separate inner classes, one per lambda body. So in order to discover the implementation (bytecode) of the inner closures, one has to jump over multiple classes. The name of such a class would contain the marker substring `$anonfun$`. The new, "indylambda" style Scala closures are compiled into **static methods** in the class where the lambdas were declared. So for lexically nested closures, their lambda bodies would all be compiled into static methods **in the same class**. This makes it much easier to discover the implementation (bytecode) of the nested lambda bodies. The name of such a static method would contain the marker substring `$anonfun$`. Discovery of inner closures involves scanning bytecode for certain patterns that represent the creation of a closure object for the inner closure. - For inner class style: the closure object creation site is like `new <InnerClassForTheClosure>(captured args)` - For "indylambda" style: the closure object creation site would be compiled into an `invokedynamic` instruction, with its "bootstrap method" pointing to the same one used by Java 8 for its serializable lambdas, and with the bootstrap method arguments pointing to the implementation method. Yes. Before this PR, Spark 2.4 / 3.0 / master on Scala 2.12 would not support Scala closures declared in a Scala REPL that captures anything from the REPL line objects. After this PR, such scenario is supported. Added new unit test case to `org.apache.spark.repl.SingletonReplSuite`. The new test case fails without the fix in this PR, and pases with the fix. Closes apache#28463 from rednaxelafx/closure-cleaner-indylambda. Authored-by: Kris Mok <kris.mok@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit dc01b75) Signed-off-by: Kris Mok <kris.mok@databricks.com>
…leaner This is a backport of #28463 from Apache Spark master/3.0 to 2.4. Minor adaptation include: - Retain the Spark 2.4-specific behavior of skipping the indylambda check when using Scala 2.11 - Remove unnecessary LMF restrictions in ClosureCleaner tests - Address review comments in the original PR from kiszk Tested with the default Scala 2.11 build, and also tested ClosureCleaner-related tests in Scala 2.12 build as well: - repl: `SingletonReplSuite` - core: `ClosureCleanerSuite` and `ClosureCleanerSuite2` --- ### What changes were proposed in this pull request? This PR proposes to enhance Spark's `ClosureCleaner` to support "indylambda" style of Scala closures to the same level as the existing implementation for the old (inner class) style ones. The goal is to reach feature parity with the support of the old style Scala closures, with as close to bug-for-bug compatibility as possible. Specifically, this PR addresses one lacking support for indylambda closures vs the inner class closures: - When a closure is declared in a Scala REPL and captures the enclosing REPL line object, such closure should be cleanable (unreferenced fields on the enclosing REPL line object should be cleaned) This PR maintains the same limitations in the new indylambda closure support as the old inner class closures, in particular the following two: - Cleaning is only available for one level of REPL line object. If a closure captures state from a REPL line object further out from the immediate enclosing one, it won't be subject to cleaning. See example below. - "Sibling" closures are not handled yet. A "sibling" closure is defined here as a closure that is directly or indirectly referenced by the starting closure, but isn't lexically enclosing. e.g. ```scala { val siblingClosure = (x: Int) => x + this.fieldA // captures `this`, references `fieldA` on `this`. val startingClosure = (y: Int) => y + this.fieldB + siblingClosure(y) // captures `this` and `siblingClosure`, references `fieldB` on `this`. } ``` The changes are intended to be minimal, with further code cleanups planned in separate PRs. Jargons: - old, inner class style Scala closures, aka `delambdafy:inline`: default in Scala 2.11 and before - new, "indylambda" style Scala closures, aka `delambdafy:method`: default in Scala 2.12 and later ### Why are the changes needed? There had been previous effortsto extend Spark's `ClosureCleaner` to support "indylambda" Scala closures, which is necessary for proper Scala 2.12 support. Most notably the work done for [SPARK-14540](https://issues.apache.org/jira/browse/SPARK-14540). But the previous efforts had missed one import scenario: a Scala closure declared in a Scala REPL, and it captures the enclosing `this` -- a REPL line object. e.g. in a Spark Shell: ```scala :pa class NotSerializableClass(val x: Int) val ns = new NotSerializableClass(42) val topLevelValue = "someValue" val func = (j: Int) => { (1 to j).flatMap { x => (1 to x).map { y => y + topLevelValue } } } <Ctrl+D> sc.parallelize(0 to 2).map(func).collect ``` In this example, `func` refers to a Scala closure that captures the enclosing `this` because it needs to access `topLevelValue`, which is in turn implemented as a field on the enclosing REPL line object. The existing `ClosureCleaner` in Spark supports cleaning this case in Scala 2.11-, and this PR brings feature parity to Scala 2.12+. Note that the existing cleaning logic only supported one level of REPL line object nesting. This PR does not go beyond that. When a closure references state declared a few commands earlier, the cleaning will fail in both Scala 2.11 and Scala 2.12. e.g. ```scala scala> :pa // Entering paste mode (ctrl-D to finish) class NotSerializableClass1(val x: Int) case class Foo(id: String) val ns = new NotSerializableClass1(42) val topLevelValue = "someValue" // Exiting paste mode, now interpreting. defined class NotSerializableClass1 defined class Foo ns: NotSerializableClass1 = NotSerializableClass1615b1baf topLevelValue: String = someValue scala> :pa // Entering paste mode (ctrl-D to finish) val closure2 = (j: Int) => { (1 to j).flatMap { x => (1 to x).map { y => y + topLevelValue } // 2 levels } } // Exiting paste mode, now interpreting. closure2: Int => scala.collection.immutable.IndexedSeq[String] = <function1> scala> sc.parallelize(0 to 2).map(closure2).collect org.apache.spark.SparkException: Task not serializable ... ``` in the Scala 2.11 / Spark 2.4.x case: ``` Caused by: java.io.NotSerializableException: NotSerializableClass1 Serialization stack: - object not serializable (class: NotSerializableClass1, value: NotSerializableClass1615b1baf) - field (class: $iw, name: ns, type: class NotSerializableClass1) - object (class $iw, $iw64df3f4b) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw66e6e5e9) - field (class: $line14.$read, name: $iw, type: class $iw) - object (class $line14.$read, $line14.$readc310aa3) - field (class: $iw, name: $line14$read, type: class $line14.$read) - object (class $iw, $iw79224636) - field (class: $iw, name: $outer, type: class $iw) - object (class $iw, $iw636d4cdc) - field (class: $anonfun$1, name: $outer, type: class $iw) - object (class $anonfun$1, <function1>) ``` in the Scala 2.12 / Spark 2.4.x case after this PR: ``` Caused by: java.io.NotSerializableException: NotSerializableClass1 Serialization stack: - object not serializable (class: NotSerializableClass1, value: NotSerializableClass16f3b4c9a) - field (class: $iw, name: ns, type: class NotSerializableClass1) - object (class $iw, $iw2945a3c1) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw152705d0) - field (class: $line14.$read, name: $iw, type: class $iw) - object (class $line14.$read, $line14.$read7cf311eb) - field (class: $iw, name: $line14$read, type: class $line14.$read) - object (class $iw, $iwd980dac) - field (class: $iw, name: $outer, type: class $iw) - object (class $iw, $iw557d9532) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic $anonfun$closure2$1$adapted:(L$iw;Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;, instantiatedMethodType=(Ljava/lang/Object;)Lscala/collection/immutable/IndexedSeq;, numCaptured=1]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class $Lambda$2103/815179920, $Lambda$2103/815179920569b57c4) ``` For more background of the new and old ways Scala lowers closures to Java bytecode, please see [A note on how NSC (New Scala Compiler) lowers lambdas](https://gist.github.com/rednaxelafx/e9ecd09bbd1c448dbddad4f4edf25d48#file-notes-md). For more background on how Spark's `ClosureCleaner` works and what's needed to make it support "indylambda" Scala closures, please refer to [A Note on Apache Spark's ClosureCleaner](https://gist.github.com/rednaxelafx/e9ecd09bbd1c448dbddad4f4edf25d48#file-spark_closurecleaner_notes-md). #### tl;dr The `ClosureCleaner` works like a mark-sweep algorithm on fields: - Finding (a chain of) outer objects referenced by the starting closure; - Scanning the starting closure and its inner closures and marking the fields on the outer objects accessed; - Cloning the outer objects, nulling out fields that are not accessed by any closure of concern. ##### Outer Objects For the old, inner class style Scala closures, the "outer objects" is defined as the lexically enclosing closures of the starting closure, plus an optional enclosing REPL line object if these closures are defined in a Scala REPL. All of them are on a singly-linked `$outer` chain. For the new, "indylambda" style Scala closures, the capturing implementation changed, so closures no longer refer to their enclosing closures via an `$outer` chain. However, a closure can still capture its enclosing REPL line object, much like the old style closures. The name of the field that captures this reference would be `arg$1` (instead of `$outer`). So what's missing in the `ClosureCleaner` for the "indylambda" support is find and potentially clone+clean the captured enclosing `this` REPL line object. That's what this PR implements. ##### Inner Closures The old, inner class style of Scala closures are compiled into separate inner classes, one per lambda body. So in order to discover the implementation (bytecode) of the inner closures, one has to jump over multiple classes. The name of such a class would contain the marker substring `$anonfun$`. The new, "indylambda" style Scala closures are compiled into **static methods** in the class where the lambdas were declared. So for lexically nested closures, their lambda bodies would all be compiled into static methods **in the same class**. This makes it much easier to discover the implementation (bytecode) of the nested lambda bodies. The name of such a static method would contain the marker substring `$anonfun$`. Discovery of inner closures involves scanning bytecode for certain patterns that represent the creation of a closure object for the inner closure. - For inner class style: the closure object creation site is like `new <InnerClassForTheClosure>(captured args)` - For "indylambda" style: the closure object creation site would be compiled into an `invokedynamic` instruction, with its "bootstrap method" pointing to the same one used by Java 8 for its serializable lambdas, and with the bootstrap method arguments pointing to the implementation method. ### Does this PR introduce _any_ user-facing change? Yes. Before this PR, Spark 2.4 / 3.0 / master on Scala 2.12 would not support Scala closures declared in a Scala REPL that captures anything from the REPL line objects. After this PR, such scenario is supported. ### How was this patch tested? Added new unit test case to `org.apache.spark.repl.SingletonReplSuite`. The new test case fails without the fix in this PR, and pases with the fix. Closes #28463 from rednaxelafx/closure-cleaner-indylambda. Authored-by: Kris Mok <kris.mokdatabricks.com> Signed-off-by: Wenchen Fan <wenchendatabricks.com> (cherry picked from commit dc01b75) Signed-off-by: Kris Mok <kris.mokdatabricks.com> Closes #28577 from rednaxelafx/backport-spark-31399-2.4. Authored-by: Kris Mok <kris.mok@databricks.com> Signed-off-by: DB Tsai <d_tsai@apple.com>
What changes were proposed in this pull request?
This PR proposes to enhance Spark's
ClosureCleaner
to support "indylambda" style of Scala closures to the same level as the existing implementation for the old (inner class) style ones. The goal is to reach feature parity with the support of the old style Scala closures, with as close to bug-for-bug compatibility as possible.Specifically, this PR addresses one lacking support for indylambda closures vs the inner class closures:
This PR maintains the same limitations in the new indylambda closure support as the old inner class closures, in particular the following two:
The changes are intended to be minimal, with further code cleanups planned in separate PRs.
Jargons:
delambdafy:inline
: default in Scala 2.11 and beforedelambdafy:method
: default in Scala 2.12 and laterWhy are the changes needed?
There had been previous effortsto extend Spark's
ClosureCleaner
to support "indylambda" Scala closures, which is necessary for proper Scala 2.12 support. Most notably the work done for SPARK-14540.But the previous efforts had missed one import scenario: a Scala closure declared in a Scala REPL, and it captures the enclosing
this
-- a REPL line object. e.g. in a Spark Shell:In this example,
func
refers to a Scala closure that captures the enclosingthis
because it needs to accesstopLevelValue
, which is in turn implemented as a field on the enclosing REPL line object.The existing
ClosureCleaner
in Spark supports cleaning this case in Scala 2.11-, and this PR brings feature parity to Scala 2.12+.Note that the existing cleaning logic only supported one level of REPL line object nesting. This PR does not go beyond that. When a closure references state declared a few commands earlier, the cleaning will fail in both Scala 2.11 and Scala 2.12. e.g.
in the Scala 2.11 / Spark 2.4.x case:
in the Scala 2.12 / Spark master case after this PR:
For more background of the new and old ways Scala lowers closures to Java bytecode, please see A note on how NSC (New Scala Compiler) lowers lambdas.
For more background on how Spark's
ClosureCleaner
works and what's needed to make it support "indylambda" Scala closures, please refer to A Note on Apache Spark's ClosureCleaner.tl;dr
The
ClosureCleaner
works like a mark-sweep algorithm on fields:Outer Objects
For the old, inner class style Scala closures, the "outer objects" is defined as the lexically enclosing closures of the starting closure, plus an optional enclosing REPL line object if these closures are defined in a Scala REPL. All of them are on a singly-linked
$outer
chain.For the new, "indylambda" style Scala closures, the capturing implementation changed, so closures no longer refer to their enclosing closures via an
$outer
chain. However, a closure can still capture its enclosing REPL line object, much like the old style closures. The name of the field that captures this reference would bearg$1
(instead of$outer
).So what's missing in the
ClosureCleaner
for the "indylambda" support is find and potentially clone+clean the captured enclosingthis
REPL line object. That's what this PR implements.Inner Closures
The old, inner class style of Scala closures are compiled into separate inner classes, one per lambda body. So in order to discover the implementation (bytecode) of the inner closures, one has to jump over multiple classes. The name of such a class would contain the marker substring
$anonfun$
.The new, "indylambda" style Scala closures are compiled into static methods in the class where the lambdas were declared. So for lexically nested closures, their lambda bodies would all be compiled into static methods in the same class. This makes it much easier to discover the implementation (bytecode) of the nested lambda bodies. The name of such a static method would contain the marker substring
$anonfun$
.Discovery of inner closures involves scanning bytecode for certain patterns that represent the creation of a closure object for the inner closure.
new <InnerClassForTheClosure>(captured args)
invokedynamic
instruction, with its "bootstrap method" pointing to the same one used by Java 8 for its serializable lambdas, and with the bootstrap method arguments pointing to the implementation method.Does this PR introduce any user-facing change?
Yes. Before this PR, Spark 2.4 / 3.0 / master on Scala 2.12 would not support Scala closures declared in a Scala REPL that captures anything from the REPL line objects. After this PR, such scenario is supported.
How was this patch tested?
Added new unit test case to
org.apache.spark.repl.SingletonReplSuite
. The new test case fails without the fix in this PR, and pases with the fix.