Skip to content

[SPARK-7120][SPARK-7121] Closure cleaner nesting + documentation + tests #5685

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 27 commits into from

Conversation

andrewor14
Copy link
Contributor

Note: ~600 lines of this is test code, and ~100 lines documentation.

[SPARK-7121] ClosureCleaner does not handle nested closures properly. For instance, in SparkContext, I tried to do the following:

def scope[T](body: => T): T = body // no-op
def myCoolMethod(path: String): RDD[String] = scope {
  parallelize(1 to 10).map { _ => path }
}

and I got an exception complaining that SparkContext is not serializable. The issue here is that the inner closure is getting its path from the outer closure (the scope), but the outer closure references the SparkContext object itself to get the parallelize method.

Note, however, that the inner closure doesn't actually need the SparkContext; it just needs a field from the outer closure. If we modify ClosureCleaner to clean the outer closure recursively using only the fields accessed by the inner closure, then we can serialize the inner closure.

[SPARK-7120] Also, the other thing is that this file is one of the least understood, partly because it is very low level and is written a long time ago. This patch attempts to change that by adding the missing documentation.

This is blocking my effort on a separate task #5729.

Andrew Or added 2 commits April 24, 2015 03:05
See in-code comments for more detail on what this means.
... in case anything breaks, we should be able to resort to old
behavior.
@SparkQA
Copy link

SparkQA commented Apr 24, 2015

Test build #30930 has finished for PR 5685 at commit 2390a60.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • * class SomethingNotSerializable
    • logDebug(s" + cloning the object $obj of class $
    • class FieldAccessFinder(
  • This patch does not change any dependencies.

@srowen
Copy link
Member

srowen commented Apr 24, 2015

You know a lot more about this than I, but I was under the impression that the closure cleaner couldn't clean beyond a level or so because it would then be modifying local object state by nulling fields in them and that's not necessarily permissible. I'm sure you're on top of that, just noting my recollection from similar discussions.

@pwendell
Copy link
Contributor

@srowen wondering, what do you mean by "local state"? The closure cleaner only nulls out fields in clones of local objects (actually the nulling mechanism is really just that it creates a clone and then selectively copies over certain fields from the original and neglects others). So I think it should be fine.

if (!inInterpreter) {
// This is a bona fide closure class, whose constructor has no effects
// other than to set its fields, so use its constructor
val cons = cls.getConstructors()(0)
val params = cons.getParameterTypes.map(createNullValue).toArray
if (outer != null) {
params(0) = outer // First param is always outer object
if (enclosingObject!= null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit space

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops thanks

@srowen
Copy link
Member

srowen commented Apr 24, 2015

Yeah I'm probably misunderstanding prior discussions, but I had the impression there was a reason you couldn't always clear references two hops away. This would be great to improve if it can be done.

@andrewor14
Copy link
Contributor Author

@srowen we cannot null fields in parent closures in place, but the closure cleaner actually clones these parent closures so it's safe to null things out there. Also, we did clear normal references multiple hops away; we just didn't also clear the parent pointers of the enclosing closures because there wasn't a way to determine if these pointers are actually needed (i.e. transitively referenced).

@@ -50,7 +50,7 @@ class ClosureCleanerSuite extends FunSuite {
val obj = new TestClassWithNesting(1)
assert(obj.run() === 96) // 4 * (1+2+3+4) + 4 * (1+2+3+4) + 16 * 1
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The suite should be expanded with the fix, right? Could you also test the common for loop case that others have seen in the past?

edit: I see your comment says tests are pending; maybe just note that I'd love to see a for loop test in particular.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coming soon

Andrew Or added 3 commits April 24, 2015 16:18
The existing ones are not passing yet because cleaning closures
is not idempotent. This will be added in a future commit.
We need this for tests because we clean the same closure many
times there. Outside of tests this is probably not important.
@mateiz
Copy link
Contributor

mateiz commented Apr 24, 2015

Does this happen in user programs or just in SparkContext? This is exactly what ClosureCleaner was designed to deal with, so I'm surprised that it's a problem.

@mateiz
Copy link
Contributor

mateiz commented Apr 25, 2015

BTW the general workaround for this kind of stuff is to grab the values from outside the closure into a local val before you call something. I'd look into how to do that before changing the ClosureCleaner too heavily.

@andrewor14
Copy link
Contributor Author

Yes, this also affects user programs. For example I modified SparkPi to follow the pattern I described in the PR description

    ...
    val slices = if (args.length > 0) args(0).toInt else 2
    (1 to 1).foreach { j =>
      val count = spark.parallelize(1 until n, slices).map { i =>
        val z = slices // *** This is the culprit ***
        val x = random * 2 - 1
        val y = random * 2 - 1
        if (x * x + y * y < 1) 1 else 0
      }.reduce(_ + _)
    }
    ...

If you run this in master branch you will run into a task not serializable exception. I was able to verify that this patch fixes this as long as spark.closureCleaner.transitive is enabled. If this is not enabled, if fails with the same exception as before.

@andrewor14
Copy link
Contributor Author

The main issue with grabbing the values into the closure is that we'll need to do this everywhere. My intention is to wrap many existing methods (in SparkContext and other files) in closures, and it will be very difficult and manual to have to localize the variables in all of these places.

@andrewor14 andrewor14 changed the title [SPARK-7120][SPARK-7121][WIP] Closure cleaner nesting + documentation [SPARK-7120][SPARK-7121] Closure cleaner nesting + documentation Apr 25, 2015
Now we keep track of the methods that we visited to avoid visiting
the same method twice.
@andrewor14 andrewor14 changed the title [SPARK-7120][SPARK-7121] Closure cleaner nesting + documentation [SPARK-7120][SPARK-7121] Closure cleaner nesting + documentation + tests Apr 25, 2015
@andrewor14
Copy link
Contributor Author

Jenkins, test this please?

@SparkQA
Copy link

SparkQA commented Apr 25, 2015

Test build #700 has started for PR 5685 at commit eb127e5.

@SparkQA
Copy link

SparkQA commented Apr 29, 2015

Test build #31315 has finished for PR 5685 at commit 26c7aba.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • * class SomethingNotSerializable
    • logDebug(s" + cloning the object $obj of class $
  • This patch does not change any dependencies.

* def y = 2
* scope("two") { println(y + 1) }
* }
* def scope(name: String)(body: => Unit) = body
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe move this def up to define it before you use it

@pwendell
Copy link
Contributor

Hey Andrew,

This is looking good. The code is quite dense but as far as I can tell, this is correct. I left some more surface level comments, if you can get to those I think it will be good to go.

// The outer pointer may be null if we have cleaned this closure before
if (outer != null) {
if (isClosure(f.getType)) {
return f.getType :: getOuterClasses(f.get(obj))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"f.get(obj)" -> "outer"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops yes, thanks.

@pwendell
Copy link
Contributor

pwendell commented May 1, 2015

@andrewor14 if you can update this I think this one is good to go.

@SparkQA
Copy link

SparkQA commented May 2, 2015

Test build #31637 has finished for PR 5685 at commit 26c5072.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • * class SomethingNotSerializable
    • logDebug(s" + cloning the object $obj of class $

@SparkQA
Copy link

SparkQA commented May 2, 2015

Test build #31639 has finished for PR 5685 at commit ea874bc.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • * class SomethingNotSerializable
    • logDebug(s" + cloning the object $obj of class $

@andrewor14
Copy link
Contributor Author

Ok the latest changes are G2G from my side.

@SparkQA
Copy link

SparkQA commented May 2, 2015

Test build #31642 timed out for PR 5685 at commit 0bbe77f after a configured wait of 150m.

DStream#transform isn't cleaning closures correctly. It is passing
an RDD to ClosureCleaner#clean. We should fix this separately
outside of this patch.
@andrewor14
Copy link
Contributor Author

It turns out that in streaming we currently pass an RDD into ClosureCleaner#clean so we can't do the type safety check that you suggested @pwendell. I will fix this separately later in #5860.

@pwendell
Copy link
Contributor

pwendell commented May 2, 2015

@andrewor14 okay makes sense. Thanks for all the hard work on this andrew LGTM pending tests.

@SparkQA
Copy link

SparkQA commented May 2, 2015

Test build #31654 has finished for PR 5685 at commit cd46230.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • * class SomethingNotSerializable
    • logDebug(s" + cloning the object $obj of class $

@asfgit asfgit closed this in 7394e7a May 2, 2015
@andrewor14 andrewor14 deleted the closure-cleaner branch May 2, 2015 07:16
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request May 28, 2015
…ests

Note: ~600 lines of this is test code, and ~100 lines documentation.

**[SPARK-7121]** ClosureCleaner does not handle nested closures properly. For instance, in SparkContext, I tried to do the following:
```
def scope[T](body: => T): T = body // no-op
def myCoolMethod(path: String): RDD[String] = scope {
  parallelize(1 to 10).map { _ => path }
}
```
and I got an exception complaining that SparkContext is not serializable. The issue here is that the inner closure is getting its path from the outer closure (the scope), but the outer closure references the SparkContext object itself to get the `parallelize` method.

Note, however, that the inner closure doesn't actually need the SparkContext; it just needs a field from the outer closure. If we modify ClosureCleaner to clean the outer closure recursively using only the fields accessed by the inner closure, then we can serialize the inner closure.

**[SPARK-7120]** Also, the other thing is that this file is one of the least understood, partly because it is very low level and is written a long time ago. This patch attempts to change that by adding the missing documentation.

This is blocking my effort on a separate task apache#5729.

Author: Andrew Or <andrew@databricks.com>

Closes apache#5685 from andrewor14/closure-cleaner and squashes the following commits:

cd46230 [Andrew Or] Revert a small change that affected streaming
0bbe77f [Andrew Or] Fix style
ea874bc [Andrew Or] Fix tests
26c5072 [Andrew Or] Address comments
16fbcfd [Andrew Or] Merge branch 'master' of github.com:apache/spark into closure-cleaner
26c7aba [Andrew Or] Revert "In sc.runJob, actually clean the inner closure"
6f75784 [Andrew Or] Revert "Guard against NPE if CC is used outside of an application"
e909a42 [Andrew Or] Guard against NPE if CC is used outside of an application
3998168 [Andrew Or] In sc.runJob, actually clean the inner closure
9187066 [Andrew Or] Merge branch 'master' of github.com:apache/spark into closure-cleaner
d889950 [Andrew Or] Revert "Bypass SerializationDebugger for now (SPARK-7180)"
9419efe [Andrew Or] Bypass SerializationDebugger for now (SPARK-7180)
6d4d3f1 [Andrew Or] Fix scala style?
4aab379 [Andrew Or] Merge branch 'master' of github.com:apache/spark into closure-cleaner
e45e904 [Andrew Or] More minor updates (wording, renaming etc.)
8b71cdb [Andrew Or] Update a few comments
eb127e5 [Andrew Or] Use private method tester for a few things
a3aa465 [Andrew Or] Add more tests for individual closure cleaner operations
e672170 [Andrew Or] Guard against potential infinite cycles in method visitor
6d36f38 [Andrew Or] Fix closure cleaner visibility
2106f12 [Andrew Or] Merge branch 'master' of github.com:apache/spark into closure-cleaner
263593d [Andrew Or] Finalize tests
06fd668 [Andrew Or] Make closure cleaning idempotent
a4866e3 [Andrew Or] Add tests (still WIP)
438c68f [Andrew Or] Minor changes
2390a60 [Andrew Or] Feature flag this new behavior
86f7823 [Andrew Or] Implement transitive cleaning + add missing documentation
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request Jun 12, 2015
…ests

Note: ~600 lines of this is test code, and ~100 lines documentation.

**[SPARK-7121]** ClosureCleaner does not handle nested closures properly. For instance, in SparkContext, I tried to do the following:
```
def scope[T](body: => T): T = body // no-op
def myCoolMethod(path: String): RDD[String] = scope {
  parallelize(1 to 10).map { _ => path }
}
```
and I got an exception complaining that SparkContext is not serializable. The issue here is that the inner closure is getting its path from the outer closure (the scope), but the outer closure references the SparkContext object itself to get the `parallelize` method.

Note, however, that the inner closure doesn't actually need the SparkContext; it just needs a field from the outer closure. If we modify ClosureCleaner to clean the outer closure recursively using only the fields accessed by the inner closure, then we can serialize the inner closure.

**[SPARK-7120]** Also, the other thing is that this file is one of the least understood, partly because it is very low level and is written a long time ago. This patch attempts to change that by adding the missing documentation.

This is blocking my effort on a separate task apache#5729.

Author: Andrew Or <andrew@databricks.com>

Closes apache#5685 from andrewor14/closure-cleaner and squashes the following commits:

cd46230 [Andrew Or] Revert a small change that affected streaming
0bbe77f [Andrew Or] Fix style
ea874bc [Andrew Or] Fix tests
26c5072 [Andrew Or] Address comments
16fbcfd [Andrew Or] Merge branch 'master' of github.com:apache/spark into closure-cleaner
26c7aba [Andrew Or] Revert "In sc.runJob, actually clean the inner closure"
6f75784 [Andrew Or] Revert "Guard against NPE if CC is used outside of an application"
e909a42 [Andrew Or] Guard against NPE if CC is used outside of an application
3998168 [Andrew Or] In sc.runJob, actually clean the inner closure
9187066 [Andrew Or] Merge branch 'master' of github.com:apache/spark into closure-cleaner
d889950 [Andrew Or] Revert "Bypass SerializationDebugger for now (SPARK-7180)"
9419efe [Andrew Or] Bypass SerializationDebugger for now (SPARK-7180)
6d4d3f1 [Andrew Or] Fix scala style?
4aab379 [Andrew Or] Merge branch 'master' of github.com:apache/spark into closure-cleaner
e45e904 [Andrew Or] More minor updates (wording, renaming etc.)
8b71cdb [Andrew Or] Update a few comments
eb127e5 [Andrew Or] Use private method tester for a few things
a3aa465 [Andrew Or] Add more tests for individual closure cleaner operations
e672170 [Andrew Or] Guard against potential infinite cycles in method visitor
6d36f38 [Andrew Or] Fix closure cleaner visibility
2106f12 [Andrew Or] Merge branch 'master' of github.com:apache/spark into closure-cleaner
263593d [Andrew Or] Finalize tests
06fd668 [Andrew Or] Make closure cleaning idempotent
a4866e3 [Andrew Or] Add tests (still WIP)
438c68f [Andrew Or] Minor changes
2390a60 [Andrew Or] Feature flag this new behavior
86f7823 [Andrew Or] Implement transitive cleaning + add missing documentation
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
…ests

Note: ~600 lines of this is test code, and ~100 lines documentation.

**[SPARK-7121]** ClosureCleaner does not handle nested closures properly. For instance, in SparkContext, I tried to do the following:
```
def scope[T](body: => T): T = body // no-op
def myCoolMethod(path: String): RDD[String] = scope {
  parallelize(1 to 10).map { _ => path }
}
```
and I got an exception complaining that SparkContext is not serializable. The issue here is that the inner closure is getting its path from the outer closure (the scope), but the outer closure references the SparkContext object itself to get the `parallelize` method.

Note, however, that the inner closure doesn't actually need the SparkContext; it just needs a field from the outer closure. If we modify ClosureCleaner to clean the outer closure recursively using only the fields accessed by the inner closure, then we can serialize the inner closure.

**[SPARK-7120]** Also, the other thing is that this file is one of the least understood, partly because it is very low level and is written a long time ago. This patch attempts to change that by adding the missing documentation.

This is blocking my effort on a separate task apache#5729.

Author: Andrew Or <andrew@databricks.com>

Closes apache#5685 from andrewor14/closure-cleaner and squashes the following commits:

cd46230 [Andrew Or] Revert a small change that affected streaming
0bbe77f [Andrew Or] Fix style
ea874bc [Andrew Or] Fix tests
26c5072 [Andrew Or] Address comments
16fbcfd [Andrew Or] Merge branch 'master' of github.com:apache/spark into closure-cleaner
26c7aba [Andrew Or] Revert "In sc.runJob, actually clean the inner closure"
6f75784 [Andrew Or] Revert "Guard against NPE if CC is used outside of an application"
e909a42 [Andrew Or] Guard against NPE if CC is used outside of an application
3998168 [Andrew Or] In sc.runJob, actually clean the inner closure
9187066 [Andrew Or] Merge branch 'master' of github.com:apache/spark into closure-cleaner
d889950 [Andrew Or] Revert "Bypass SerializationDebugger for now (SPARK-7180)"
9419efe [Andrew Or] Bypass SerializationDebugger for now (SPARK-7180)
6d4d3f1 [Andrew Or] Fix scala style?
4aab379 [Andrew Or] Merge branch 'master' of github.com:apache/spark into closure-cleaner
e45e904 [Andrew Or] More minor updates (wording, renaming etc.)
8b71cdb [Andrew Or] Update a few comments
eb127e5 [Andrew Or] Use private method tester for a few things
a3aa465 [Andrew Or] Add more tests for individual closure cleaner operations
e672170 [Andrew Or] Guard against potential infinite cycles in method visitor
6d36f38 [Andrew Or] Fix closure cleaner visibility
2106f12 [Andrew Or] Merge branch 'master' of github.com:apache/spark into closure-cleaner
263593d [Andrew Or] Finalize tests
06fd668 [Andrew Or] Make closure cleaning idempotent
a4866e3 [Andrew Or] Add tests (still WIP)
438c68f [Andrew Or] Minor changes
2390a60 [Andrew Or] Feature flag this new behavior
86f7823 [Andrew Or] Implement transitive cleaning + add missing documentation
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants