Skip to content

[SPARK-6886] [PySpark] fix big closure with shuffle #5496

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

davies
Copy link
Contributor

@davies davies commented Apr 13, 2015

Currently, the created broadcast object will have same life cycle as RDD in Python. For multistage jobs, an PythonRDD will be created in JVM and the RDD in Python may be GCed, then the broadcast will be destroyed in JVM before the PythonRDD.

This PR change to use PythonRDD to track the lifecycle of the broadcast object. It also have a refactor about getNumPartitions() to avoid unnecessary creation of PythonRDD, which could be heavy.

cc @JoshRosen

@SparkQA
Copy link

SparkQA commented Apr 13, 2015

Test build #30192 has finished for PR 5496 at commit 9a0ea4c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Assignment(id: Long, cluster: Int)
  • This patch does not change any dependencies.

@JoshRosen
Copy link
Contributor

This fix looks good to me. There are two contributors to the bug here:

  1. In certain Python-driver-side operations, like groupBy(), we create RDDs that implicitly reference the previous RDD via its JavaRDD rather than by holding an explicit reference to the parent Python RDD object, which may result in a Python driver's RDD object being garbage collected even though the Java PythonRDD object sticks around (due to the reference from a child RDD). To see this more clearly, notice that there are places in rdd.py where we call RDD(self._jrdd, ...) without actually storing a reference to self in the new derived RDD.
  2. When an RDD is garbage-collected in the Python driver, the __del__ call here manually unpersists the Java broadcast variable even though there are still references to it.

This problem only manifests itself if Python closures are very large (large enough to trip the 1MB threshold which causes us to broadcast them) and are referenced by intermediate Python RDDs that are garbage-collected.

The fix implemented in this patch is to remove this __del__ call and leave it up to ContextCleaner to manage the Broadcast cleanup. To verify that this won't introduce memory leaks, I took a look at how we track references to the Python broadcast in branch-1.2:

  • When we create a Broadcast object in Python, we capture a reference to the Java broadcast:
    self._jbroadcast = sc._jvm.PythonRDD.readBroadcastFromFile(sc._jsc, self._path)
    . Aside from this reference, we don't keep any other references to the Java broadcast inside of the Python driver.
  • We do not maintain a Python-driver-side registry of these Python broadcast objects, so I don't think we have to worry about leaked references keeping the Python broadcast alive and preventing the Java broadcast from being garbage-collected.

An alternative fix would be to prevent Python-side RDDs from being garbage-collected until their corresponding Java RDDs are no longer referenced, but even if we did make this change I think it's a good idea to leave the cleaning of broadcasts to Spark's ContextCleaner rather than trying to manage it here.

I tested this out manually in an IPython REPL and broadcasts seem to be cleaned up at the right times.

Since this looks good to me, I'm going to merge this into master (1.4.0) and branch-1.3 (1.3.2). I'll try to cherry-pick into branch-1.2 (1.2.3), but we may have to open a separate PR if I can't fix the conflicts easily. Thanks @davies for fixing this!

asfgit pushed a commit that referenced this pull request Apr 15, 2015
Currently, the created broadcast object will have same life cycle as RDD in Python. For multistage jobs, an PythonRDD will be created in JVM and the RDD in Python may be GCed, then the broadcast will be destroyed in JVM before the PythonRDD.

This PR change to use PythonRDD to track the lifecycle of the broadcast object. It also have a refactor about getNumPartitions() to avoid unnecessary creation of PythonRDD, which could be heavy.

cc JoshRosen

Author: Davies Liu <davies@databricks.com>

Closes #5496 from davies/big_closure and squashes the following commits:

9a0ea4c [Davies Liu] fix big closure with shuffle

(cherry picked from commit f11288d)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
@asfgit asfgit closed this in f11288d Apr 15, 2015
@JoshRosen
Copy link
Contributor

Finished the 1.2 backport as well.

asfgit pushed a commit that referenced this pull request Apr 15, 2015
Currently, the created broadcast object will have same life cycle as RDD in Python. For multistage jobs, an PythonRDD will be created in JVM and the RDD in Python may be GCed, then the broadcast will be destroyed in JVM before the PythonRDD.

This PR change to use PythonRDD to track the lifecycle of the broadcast object. It also have a refactor about getNumPartitions() to avoid unnecessary creation of PythonRDD, which could be heavy.

cc JoshRosen

Author: Davies Liu <davies@databricks.com>

Closes #5496 from davies/big_closure and squashes the following commits:

9a0ea4c [Davies Liu] fix big closure with shuffle

Conflicts:
	python/pyspark/rdd.py
@davies
Copy link
Contributor Author

davies commented Apr 15, 2015

@JoshRosen Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants