-
Notifications
You must be signed in to change notification settings - Fork 28.5k
[SPARK-7120][SPARK-7121] Closure cleaner nesting + documentation + tests #5685
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
See in-code comments for more detail on what this means.
... in case anything breaks, we should be able to resort to old behavior.
Test build #30930 has finished for PR 5685 at commit
|
You know a lot more about this than I, but I was under the impression that the closure cleaner couldn't clean beyond a level or so because it would then be modifying local object state by nulling fields in them and that's not necessarily permissible. I'm sure you're on top of that, just noting my recollection from similar discussions. |
@srowen wondering, what do you mean by "local state"? The closure cleaner only nulls out fields in clones of local objects (actually the nulling mechanism is really just that it creates a clone and then selectively copies over certain fields from the original and neglects others). So I think it should be fine. |
if (!inInterpreter) { | ||
// This is a bona fide closure class, whose constructor has no effects | ||
// other than to set its fields, so use its constructor | ||
val cons = cls.getConstructors()(0) | ||
val params = cons.getParameterTypes.map(createNullValue).toArray | ||
if (outer != null) { | ||
params(0) = outer // First param is always outer object | ||
if (enclosingObject!= null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit space
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops thanks
Yeah I'm probably misunderstanding prior discussions, but I had the impression there was a reason you couldn't always clear references two hops away. This would be great to improve if it can be done. |
@srowen we cannot null fields in parent closures in place, but the closure cleaner actually clones these parent closures so it's safe to null things out there. Also, we did clear normal references multiple hops away; we just didn't also clear the parent pointers of the enclosing closures because there wasn't a way to determine if these pointers are actually needed (i.e. transitively referenced). |
@@ -50,7 +50,7 @@ class ClosureCleanerSuite extends FunSuite { | |||
val obj = new TestClassWithNesting(1) | |||
assert(obj.run() === 96) // 4 * (1+2+3+4) + 4 * (1+2+3+4) + 16 * 1 | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The suite should be expanded with the fix, right? Could you also test the common for
loop case that others have seen in the past?
edit: I see your comment says tests are pending; maybe just note that I'd love to see a for
loop test in particular.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
coming soon
The existing ones are not passing yet because cleaning closures is not idempotent. This will be added in a future commit.
We need this for tests because we clean the same closure many times there. Outside of tests this is probably not important.
Does this happen in user programs or just in SparkContext? This is exactly what ClosureCleaner was designed to deal with, so I'm surprised that it's a problem. |
BTW the general workaround for this kind of stuff is to grab the values from outside the closure into a local val before you call something. I'd look into how to do that before changing the ClosureCleaner too heavily. |
Yes, this also affects user programs. For example I modified SparkPi to follow the pattern I described in the PR description
If you run this in master branch you will run into a task not serializable exception. I was able to verify that this patch fixes this as long as |
The main issue with grabbing the values into the closure is that we'll need to do this everywhere. My intention is to wrap many existing methods (in SparkContext and other files) in closures, and it will be very difficult and manual to have to localize the variables in all of these places. |
Now we keep track of the methods that we visited to avoid visiting the same method twice.
29da296
to
e672170
Compare
Jenkins, test this please? |
Test build #700 has started for PR 5685 at commit |
Test build #31315 has finished for PR 5685 at commit
|
* def y = 2 | ||
* scope("two") { println(y + 1) } | ||
* } | ||
* def scope(name: String)(body: => Unit) = body |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe move this def up to define it before you use it
Hey Andrew, This is looking good. The code is quite dense but as far as I can tell, this is correct. I left some more surface level comments, if you can get to those I think it will be good to go. |
// The outer pointer may be null if we have cleaned this closure before | ||
if (outer != null) { | ||
if (isClosure(f.getType)) { | ||
return f.getType :: getOuterClasses(f.get(obj)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"f.get(obj)" -> "outer"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops yes, thanks.
@andrewor14 if you can update this I think this one is good to go. |
Test build #31637 has finished for PR 5685 at commit
|
Test build #31639 has finished for PR 5685 at commit
|
Ok the latest changes are G2G from my side. |
Test build #31642 timed out for PR 5685 at commit |
DStream#transform isn't cleaning closures correctly. It is passing an RDD to ClosureCleaner#clean. We should fix this separately outside of this patch.
@andrewor14 okay makes sense. Thanks for all the hard work on this andrew LGTM pending tests. |
Test build #31654 has finished for PR 5685 at commit
|
…ests Note: ~600 lines of this is test code, and ~100 lines documentation. **[SPARK-7121]** ClosureCleaner does not handle nested closures properly. For instance, in SparkContext, I tried to do the following: ``` def scope[T](body: => T): T = body // no-op def myCoolMethod(path: String): RDD[String] = scope { parallelize(1 to 10).map { _ => path } } ``` and I got an exception complaining that SparkContext is not serializable. The issue here is that the inner closure is getting its path from the outer closure (the scope), but the outer closure references the SparkContext object itself to get the `parallelize` method. Note, however, that the inner closure doesn't actually need the SparkContext; it just needs a field from the outer closure. If we modify ClosureCleaner to clean the outer closure recursively using only the fields accessed by the inner closure, then we can serialize the inner closure. **[SPARK-7120]** Also, the other thing is that this file is one of the least understood, partly because it is very low level and is written a long time ago. This patch attempts to change that by adding the missing documentation. This is blocking my effort on a separate task apache#5729. Author: Andrew Or <andrew@databricks.com> Closes apache#5685 from andrewor14/closure-cleaner and squashes the following commits: cd46230 [Andrew Or] Revert a small change that affected streaming 0bbe77f [Andrew Or] Fix style ea874bc [Andrew Or] Fix tests 26c5072 [Andrew Or] Address comments 16fbcfd [Andrew Or] Merge branch 'master' of github.com:apache/spark into closure-cleaner 26c7aba [Andrew Or] Revert "In sc.runJob, actually clean the inner closure" 6f75784 [Andrew Or] Revert "Guard against NPE if CC is used outside of an application" e909a42 [Andrew Or] Guard against NPE if CC is used outside of an application 3998168 [Andrew Or] In sc.runJob, actually clean the inner closure 9187066 [Andrew Or] Merge branch 'master' of github.com:apache/spark into closure-cleaner d889950 [Andrew Or] Revert "Bypass SerializationDebugger for now (SPARK-7180)" 9419efe [Andrew Or] Bypass SerializationDebugger for now (SPARK-7180) 6d4d3f1 [Andrew Or] Fix scala style? 4aab379 [Andrew Or] Merge branch 'master' of github.com:apache/spark into closure-cleaner e45e904 [Andrew Or] More minor updates (wording, renaming etc.) 8b71cdb [Andrew Or] Update a few comments eb127e5 [Andrew Or] Use private method tester for a few things a3aa465 [Andrew Or] Add more tests for individual closure cleaner operations e672170 [Andrew Or] Guard against potential infinite cycles in method visitor 6d36f38 [Andrew Or] Fix closure cleaner visibility 2106f12 [Andrew Or] Merge branch 'master' of github.com:apache/spark into closure-cleaner 263593d [Andrew Or] Finalize tests 06fd668 [Andrew Or] Make closure cleaning idempotent a4866e3 [Andrew Or] Add tests (still WIP) 438c68f [Andrew Or] Minor changes 2390a60 [Andrew Or] Feature flag this new behavior 86f7823 [Andrew Or] Implement transitive cleaning + add missing documentation
…ests Note: ~600 lines of this is test code, and ~100 lines documentation. **[SPARK-7121]** ClosureCleaner does not handle nested closures properly. For instance, in SparkContext, I tried to do the following: ``` def scope[T](body: => T): T = body // no-op def myCoolMethod(path: String): RDD[String] = scope { parallelize(1 to 10).map { _ => path } } ``` and I got an exception complaining that SparkContext is not serializable. The issue here is that the inner closure is getting its path from the outer closure (the scope), but the outer closure references the SparkContext object itself to get the `parallelize` method. Note, however, that the inner closure doesn't actually need the SparkContext; it just needs a field from the outer closure. If we modify ClosureCleaner to clean the outer closure recursively using only the fields accessed by the inner closure, then we can serialize the inner closure. **[SPARK-7120]** Also, the other thing is that this file is one of the least understood, partly because it is very low level and is written a long time ago. This patch attempts to change that by adding the missing documentation. This is blocking my effort on a separate task apache#5729. Author: Andrew Or <andrew@databricks.com> Closes apache#5685 from andrewor14/closure-cleaner and squashes the following commits: cd46230 [Andrew Or] Revert a small change that affected streaming 0bbe77f [Andrew Or] Fix style ea874bc [Andrew Or] Fix tests 26c5072 [Andrew Or] Address comments 16fbcfd [Andrew Or] Merge branch 'master' of github.com:apache/spark into closure-cleaner 26c7aba [Andrew Or] Revert "In sc.runJob, actually clean the inner closure" 6f75784 [Andrew Or] Revert "Guard against NPE if CC is used outside of an application" e909a42 [Andrew Or] Guard against NPE if CC is used outside of an application 3998168 [Andrew Or] In sc.runJob, actually clean the inner closure 9187066 [Andrew Or] Merge branch 'master' of github.com:apache/spark into closure-cleaner d889950 [Andrew Or] Revert "Bypass SerializationDebugger for now (SPARK-7180)" 9419efe [Andrew Or] Bypass SerializationDebugger for now (SPARK-7180) 6d4d3f1 [Andrew Or] Fix scala style? 4aab379 [Andrew Or] Merge branch 'master' of github.com:apache/spark into closure-cleaner e45e904 [Andrew Or] More minor updates (wording, renaming etc.) 8b71cdb [Andrew Or] Update a few comments eb127e5 [Andrew Or] Use private method tester for a few things a3aa465 [Andrew Or] Add more tests for individual closure cleaner operations e672170 [Andrew Or] Guard against potential infinite cycles in method visitor 6d36f38 [Andrew Or] Fix closure cleaner visibility 2106f12 [Andrew Or] Merge branch 'master' of github.com:apache/spark into closure-cleaner 263593d [Andrew Or] Finalize tests 06fd668 [Andrew Or] Make closure cleaning idempotent a4866e3 [Andrew Or] Add tests (still WIP) 438c68f [Andrew Or] Minor changes 2390a60 [Andrew Or] Feature flag this new behavior 86f7823 [Andrew Or] Implement transitive cleaning + add missing documentation
…ests Note: ~600 lines of this is test code, and ~100 lines documentation. **[SPARK-7121]** ClosureCleaner does not handle nested closures properly. For instance, in SparkContext, I tried to do the following: ``` def scope[T](body: => T): T = body // no-op def myCoolMethod(path: String): RDD[String] = scope { parallelize(1 to 10).map { _ => path } } ``` and I got an exception complaining that SparkContext is not serializable. The issue here is that the inner closure is getting its path from the outer closure (the scope), but the outer closure references the SparkContext object itself to get the `parallelize` method. Note, however, that the inner closure doesn't actually need the SparkContext; it just needs a field from the outer closure. If we modify ClosureCleaner to clean the outer closure recursively using only the fields accessed by the inner closure, then we can serialize the inner closure. **[SPARK-7120]** Also, the other thing is that this file is one of the least understood, partly because it is very low level and is written a long time ago. This patch attempts to change that by adding the missing documentation. This is blocking my effort on a separate task apache#5729. Author: Andrew Or <andrew@databricks.com> Closes apache#5685 from andrewor14/closure-cleaner and squashes the following commits: cd46230 [Andrew Or] Revert a small change that affected streaming 0bbe77f [Andrew Or] Fix style ea874bc [Andrew Or] Fix tests 26c5072 [Andrew Or] Address comments 16fbcfd [Andrew Or] Merge branch 'master' of github.com:apache/spark into closure-cleaner 26c7aba [Andrew Or] Revert "In sc.runJob, actually clean the inner closure" 6f75784 [Andrew Or] Revert "Guard against NPE if CC is used outside of an application" e909a42 [Andrew Or] Guard against NPE if CC is used outside of an application 3998168 [Andrew Or] In sc.runJob, actually clean the inner closure 9187066 [Andrew Or] Merge branch 'master' of github.com:apache/spark into closure-cleaner d889950 [Andrew Or] Revert "Bypass SerializationDebugger for now (SPARK-7180)" 9419efe [Andrew Or] Bypass SerializationDebugger for now (SPARK-7180) 6d4d3f1 [Andrew Or] Fix scala style? 4aab379 [Andrew Or] Merge branch 'master' of github.com:apache/spark into closure-cleaner e45e904 [Andrew Or] More minor updates (wording, renaming etc.) 8b71cdb [Andrew Or] Update a few comments eb127e5 [Andrew Or] Use private method tester for a few things a3aa465 [Andrew Or] Add more tests for individual closure cleaner operations e672170 [Andrew Or] Guard against potential infinite cycles in method visitor 6d36f38 [Andrew Or] Fix closure cleaner visibility 2106f12 [Andrew Or] Merge branch 'master' of github.com:apache/spark into closure-cleaner 263593d [Andrew Or] Finalize tests 06fd668 [Andrew Or] Make closure cleaning idempotent a4866e3 [Andrew Or] Add tests (still WIP) 438c68f [Andrew Or] Minor changes 2390a60 [Andrew Or] Feature flag this new behavior 86f7823 [Andrew Or] Implement transitive cleaning + add missing documentation
Note: ~600 lines of this is test code, and ~100 lines documentation.
[SPARK-7121] ClosureCleaner does not handle nested closures properly. For instance, in SparkContext, I tried to do the following:
and I got an exception complaining that SparkContext is not serializable. The issue here is that the inner closure is getting its path from the outer closure (the scope), but the outer closure references the SparkContext object itself to get the
parallelize
method.Note, however, that the inner closure doesn't actually need the SparkContext; it just needs a field from the outer closure. If we modify ClosureCleaner to clean the outer closure recursively using only the fields accessed by the inner closure, then we can serialize the inner closure.
[SPARK-7120] Also, the other thing is that this file is one of the least understood, partly because it is very low level and is written a long time ago. This patch attempts to change that by adding the missing documentation.
This is blocking my effort on a separate task #5729.