-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-4834] [standalone] Clean up application files after app finishes. #3705
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Commit 7aacb7b added support for sharing downloaded files among multiple executors of the same app. That works great in Yarn, since the app's directory is cleaned up after the app is done. But Spark standalone mode didn't do that, so the lock/cache files created by that change were left around and could eventually fill up the disk hosting /tmp. To solve that, create app-specific directories under the local dirs when launching executors. Multiple executors launched by the same Worker will use the same app directories, so they should be able to share the downloaded files. When the application finishes, a new message is sent to all executors telling them the application has finished; once that message has been received, and all executors registered for the application shut down, then those directories will be cleaned up by the Worker. Note 1: Unit testing this is hard (if even possible), since local-cluster mode doesn't seem to leave the Master/Worker daemons running long enough after `sc.stop()` is called for the clean up protocol to take effect. Note 2: the code tracking finished apps / app directories in Master.scala and Worker.scala is not really thread-safe, but then the code that modifies other shared maps in those classes isn't either, so this change is not making anything worse.
I tested this on a standalone cluster; verified that log message shows up in Worker logs and that the temp files are not left behing in /tmp. |
Test build #24470 has started for PR 3705 at commit
|
Test build #24470 has finished for PR 3705 at commit
|
Test PASSed. |
I have a testing framework that can reliably test these sorts of scenarios; I'm planning to open-source it once I've cleaned up and reorganized its code.
Yeah, the code there is due for a thread-safety audit / cleanup. A lot of the fields in those files have unnecessarily broad visibility, in many cases because they're read in the web UI. |
|
||
// Application finished message, used for cleanup | ||
|
||
case class ApplicationFinished(id: String) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this should be under the // Master to Worker
heading in this file?
Regarding thread-safety, it looks like |
It wasn't very clear to me in which cases messages could be processed in parallel using akka from my last read of the documentation, but guess I'm due for a second reading. :-) I'll double-check that I'm not breaking that assumption at least. |
Test build #24716 has started for PR 3705 at commit
|
re: thread-safety, the code I added could definitely be triggered from different actors, so I added some synchronization. For the |
Yeah, this is kind of hard to find, but it's described in the Actors and the Java Memory Model section:
To get parallelism, the Akka solution is to use multiple actors. That said, people do mix the actor model with other concurrency models, so it pays to be careful here. |
Do you mean the |
Hmm, I'm not so sure. From the page you linked:
I read that sentence as: "messages from the same sender are processed in order", not "each receiver will only process one message at a time". Anyway, in these cases I tend to err on the side of safety; the extra synchronization here shouldn't cause any performance issues. |
Test build #24716 has finished for PR 3705 at commit
|
Test PASSed. |
@vanzin You need to read some more about Akka actors. The fundamental abstraction is that an actor has a message queue, the only way to communicate with the actor is via messages, those messages end up in the queue, and they are handled one at a time in the |
(lack of) thread-safety issues aside, this PR looks good to me. It makes sense to have application-specific directories now that we have state that tied to an application's lifecycle (lock files, file caches, etc) rather than individual executors' lifecycles. |
@markhamstra that sounds a little limiting (forcing message handling to be effectively single threaded), but that's an akka issue, not a Spark issue. :-) Thanks for the pointer, though. |
Do you think we should remove the |
Sure, I can do that. |
Test build #24742 has started for PR 3705 at commit
|
Test build #24742 has finished for PR 3705 at commit
|
Test FAILed. |
That most recent failure is due to a known flaky test, org.apache.spark.streaming.CheckpointSuite.recovery with file input stream, so I'm not going to make Jenkins re-test this. This looks good to me, so I'm going to merge it into |
Commit 7aacb7b added support for sharing downloaded files among multiple executors of the same app. That works great in Yarn, since the app's directory is cleaned up after the app is done. But Spark standalone mode didn't do that, so the lock/cache files created by that change were left around and could eventually fill up the disk hosting /tmp. To solve that, create app-specific directories under the local dirs when launching executors. Multiple executors launched by the same Worker will use the same app directories, so they should be able to share the downloaded files. When the application finishes, a new message is sent to all workers telling them the application has finished; once that message has been received, and all executors registered for the application shut down, then those directories will be cleaned up by the Worker. Note: Unit testing this is hard (if even possible), since local-cluster mode doesn't seem to leave the Master/Worker daemons running long enough after `sc.stop()` is called for the clean up protocol to take effect. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #3705 from vanzin/SPARK-4834 and squashes the following commits: b430534 [Marcelo Vanzin] Remove seemingly unnecessary synchronization. 50eb4b9 [Marcelo Vanzin] Review feedback. c0e5ea5 [Marcelo Vanzin] [SPARK-4834] [standalone] Clean up application files after app finishes. (cherry picked from commit dd15536) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
Commit 7aacb7b added support for sharing downloaded files among multiple
executors of the same app. That works great in Yarn, since the app's directory
is cleaned up after the app is done.
But Spark standalone mode didn't do that, so the lock/cache files created
by that change were left around and could eventually fill up the disk hosting
/tmp.
To solve that, create app-specific directories under the local dirs when
launching executors. Multiple executors launched by the same Worker will
use the same app directories, so they should be able to share the downloaded
files. When the application finishes, a new message is sent to all workers
telling them the application has finished; once that message has been received,
and all executors registered for the application shut down, then those
directories will be cleaned up by the Worker.
Note: Unit testing this is hard (if even possible), since local-cluster mode
doesn't seem to leave the Master/Worker daemons running long enough after
sc.stop()
is called for the clean up protocol to take effect.