-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-6886] [PySpark] fix big closure with shuffle #5496
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #30192 has finished for PR 5496 at commit
|
This fix looks good to me. There are two contributors to the bug here:
This problem only manifests itself if Python closures are very large (large enough to trip the 1MB threshold which causes us to broadcast them) and are referenced by intermediate Python RDDs that are garbage-collected. The fix implemented in this patch is to remove this
An alternative fix would be to prevent Python-side RDDs from being garbage-collected until their corresponding Java RDDs are no longer referenced, but even if we did make this change I think it's a good idea to leave the cleaning of broadcasts to Spark's ContextCleaner rather than trying to manage it here. I tested this out manually in an IPython REPL and broadcasts seem to be cleaned up at the right times. Since this looks good to me, I'm going to merge this into |
Currently, the created broadcast object will have same life cycle as RDD in Python. For multistage jobs, an PythonRDD will be created in JVM and the RDD in Python may be GCed, then the broadcast will be destroyed in JVM before the PythonRDD. This PR change to use PythonRDD to track the lifecycle of the broadcast object. It also have a refactor about getNumPartitions() to avoid unnecessary creation of PythonRDD, which could be heavy. cc JoshRosen Author: Davies Liu <davies@databricks.com> Closes #5496 from davies/big_closure and squashes the following commits: 9a0ea4c [Davies Liu] fix big closure with shuffle (cherry picked from commit f11288d) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
Finished the 1.2 backport as well. |
Currently, the created broadcast object will have same life cycle as RDD in Python. For multistage jobs, an PythonRDD will be created in JVM and the RDD in Python may be GCed, then the broadcast will be destroyed in JVM before the PythonRDD. This PR change to use PythonRDD to track the lifecycle of the broadcast object. It also have a refactor about getNumPartitions() to avoid unnecessary creation of PythonRDD, which could be heavy. cc JoshRosen Author: Davies Liu <davies@databricks.com> Closes #5496 from davies/big_closure and squashes the following commits: 9a0ea4c [Davies Liu] fix big closure with shuffle Conflicts: python/pyspark/rdd.py
@JoshRosen Thanks! |
Currently, the created broadcast object will have same life cycle as RDD in Python. For multistage jobs, an PythonRDD will be created in JVM and the RDD in Python may be GCed, then the broadcast will be destroyed in JVM before the PythonRDD.
This PR change to use PythonRDD to track the lifecycle of the broadcast object. It also have a refactor about getNumPartitions() to avoid unnecessary creation of PythonRDD, which could be heavy.
cc @JoshRosen