Skip to content

[SPARK-4834] [standalone] Clean up application files after app finishes. #3705

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from

Conversation

vanzin
Copy link
Contributor

@vanzin vanzin commented Dec 15, 2014

Commit 7aacb7b added support for sharing downloaded files among multiple
executors of the same app. That works great in Yarn, since the app's directory
is cleaned up after the app is done.

But Spark standalone mode didn't do that, so the lock/cache files created
by that change were left around and could eventually fill up the disk hosting
/tmp.

To solve that, create app-specific directories under the local dirs when
launching executors. Multiple executors launched by the same Worker will
use the same app directories, so they should be able to share the downloaded
files. When the application finishes, a new message is sent to all workers
telling them the application has finished; once that message has been received,
and all executors registered for the application shut down, then those
directories will be cleaned up by the Worker.

Note: Unit testing this is hard (if even possible), since local-cluster mode
doesn't seem to leave the Master/Worker daemons running long enough after
sc.stop() is called for the clean up protocol to take effect.

Commit 7aacb7b added support for sharing downloaded files among multiple
executors of the same app. That works great in Yarn, since the app's directory
is cleaned up after the app is done.

But Spark standalone mode didn't do that, so the lock/cache files created
by that change were left around and could eventually fill up the disk hosting
/tmp.

To solve that, create app-specific directories under the local dirs when
launching executors. Multiple executors launched by the same Worker will
use the same app directories, so they should be able to share the downloaded
files. When the application finishes, a new message is sent to all executors
telling them the application has finished; once that message has been received,
and all executors registered for the application shut down, then those
directories will be cleaned up by the Worker.

Note 1: Unit testing this is hard (if even possible), since local-cluster mode
doesn't seem to leave the Master/Worker daemons running long enough after
`sc.stop()` is called for the clean up protocol to take effect.

Note 2: the code tracking finished apps / app directories in Master.scala
and Worker.scala is not really thread-safe, but then the code that modifies
other shared maps in those classes isn't either, so this change is not making
anything worse.
@vanzin
Copy link
Contributor Author

vanzin commented Dec 15, 2014

I tested this on a standalone cluster; verified that log message shows up in Worker logs and that the temp files are not left behing in /tmp.

@SparkQA
Copy link

SparkQA commented Dec 15, 2014

Test build #24470 has started for PR 3705 at commit c0e5ea5.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 15, 2014

Test build #24470 has finished for PR 3705 at commit c0e5ea5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ApplicationFinished(id: String)

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24470/
Test PASSed.

@JoshRosen
Copy link
Contributor

Note 1: Unit testing this is hard (if even possible), since local-cluster mode doesn't seem to leave the Master/Worker daemons running long enough after sc.stop() is called for the clean up protocol to take effect.

I have a testing framework that can reliably test these sorts of scenarios; I'm planning to open-source it once I've cleaned up and reorganized its code.

Note 2: the code tracking finished apps / app directories in Master.scala and Worker.scala is not really thread-safe, but then the code that modifies other shared maps in those classes isn't either, so this change is not making anything worse.

Yeah, the code there is due for a thread-safety audit / cleanup. A lot of the fields in those files have unnecessarily broad visibility, in many cases because they're read in the web UI.


// Application finished message, used for cleanup

case class ApplicationFinished(id: String)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be under the // Master to Worker heading in this file?

@JoshRosen
Copy link
Contributor

Regarding thread-safety, it looks like Worker.executors is only modified from methods that are called through the Worker actor, so I think we get implicit thread-safety due to that actor's message-at-a-time processing.

@vanzin
Copy link
Contributor Author

vanzin commented Dec 22, 2014

implicit thread-safety due to that actor's message-at-a-time processing.

It wasn't very clear to me in which cases messages could be processed in parallel using akka from my last read of the documentation, but guess I'm due for a second reading. :-)

I'll double-check that I'm not breaking that assumption at least.

@SparkQA
Copy link

SparkQA commented Dec 22, 2014

Test build #24716 has started for PR 3705 at commit 50eb4b9.

  • This patch merges cleanly.

@vanzin
Copy link
Contributor Author

vanzin commented Dec 22, 2014

re: thread-safety, the code I added could definitely be triggered from different actors, so I added some synchronization. For the executors map you mention, it seems to only be modified while handling messages from Master, but it's definitely accessed when handling messages from other sources, and at least with Java's HashMap that is not safe. Maybe Scala's has different concurrency semantics, though.

@JoshRosen
Copy link
Contributor

It wasn't very clear to me in which cases messages could be processed in parallel using akka from my last read of the documentation, but guess I'm due for a second reading. :-)

Yeah, this is kind of hard to find, but it's described in the Actors and the Java Memory Model section:

The actor subsequent processing rule: processing of one message happens before processing of the next message by the same actor.

To get parallelism, the Akka solution is to use multiple actors. That said, people do mix the actor model with other concurrency models, so it pays to be careful here.

@JoshRosen
Copy link
Contributor

re: thread-safety, the code I added could definitely be triggered from different actors, so I added some synchronization

Do you mean the maybeCleanupApplication method? It looks like it's only called from two places, both message-handlers in receiveWithLogging (which is synchronized by Akka). Similarly, appDirectories is only accessed in receiveWithLogging and maybeCleanupApplication, so I think all accesses to that field will be serialized by Akka.

@vanzin
Copy link
Contributor Author

vanzin commented Dec 23, 2014

so I think all accesses to that field will be serialized by Akka.

Hmm, I'm not so sure. From the page you linked:

processing of one message happens before processing of the next message by the same actor.

I read that sentence as: "messages from the same sender are processed in order", not "each receiver will only process one message at a time". Worker.receiveWithLogging handles messages from both Master and Executor, and both could cause appDirectories to be modified.

Anyway, in these cases I tend to err on the side of safety; the extra synchronization here shouldn't cause any performance issues.

@SparkQA
Copy link

SparkQA commented Dec 23, 2014

Test build #24716 has finished for PR 3705 at commit 50eb4b9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ApplicationFinished(id: String)

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24716/
Test PASSed.

@markhamstra
Copy link
Contributor

@vanzin You need to read some more about Akka actors.

The fundamental abstraction is that an actor has a message queue, the only way to communicate with the actor is via messages, those messages end up in the queue, and they are handled one at a time in the receive method. That the actor could be receiving messages from more than one sender does not in any way imply that messages will be processed concurrently.

@JoshRosen
Copy link
Contributor

(lack of) thread-safety issues aside, this PR looks good to me. It makes sense to have application-specific directories now that we have state that tied to an application's lifecycle (lock files, file caches, etc) rather than individual executors' lifecycles.

@vanzin
Copy link
Contributor Author

vanzin commented Dec 23, 2014

@markhamstra that sounds a little limiting (forcing message handling to be effectively single threaded), but that's an akka issue, not a Spark issue. :-) Thanks for the pointer, though.

@JoshRosen
Copy link
Contributor

Do you think we should remove the synchronized now that we're convinced it's no longer necessary? Not a huge deal to leave it, but potentially confusing since it's not the norm to need synchronization within an actor.

@vanzin
Copy link
Contributor Author

vanzin commented Dec 23, 2014

Sure, I can do that.

@SparkQA
Copy link

SparkQA commented Dec 23, 2014

Test build #24742 has started for PR 3705 at commit b430534.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 23, 2014

Test build #24742 has finished for PR 3705 at commit b430534.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ApplicationFinished(id: String)

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24742/
Test FAILed.

@JoshRosen
Copy link
Contributor

That most recent failure is due to a known flaky test, org.apache.spark.streaming.CheckpointSuite.recovery with file input stream, so I'm not going to make Jenkins re-test this.

This looks good to me, so I'm going to merge it into master (1.3.0) and branch-1.2 (1.2.1). Thanks for the PR!

asfgit pushed a commit that referenced this pull request Dec 23, 2014
Commit 7aacb7b added support for sharing downloaded files among multiple
executors of the same app. That works great in Yarn, since the app's directory
is cleaned up after the app is done.

But Spark standalone mode didn't do that, so the lock/cache files created
by that change were left around and could eventually fill up the disk hosting
/tmp.

To solve that, create app-specific directories under the local dirs when
launching executors. Multiple executors launched by the same Worker will
use the same app directories, so they should be able to share the downloaded
files. When the application finishes, a new message is sent to all workers
telling them the application has finished; once that message has been received,
and all executors registered for the application shut down, then those
directories will be cleaned up by the Worker.

Note: Unit testing this is hard (if even possible), since local-cluster mode
doesn't seem to leave the Master/Worker daemons running long enough after
`sc.stop()` is called for the clean up protocol to take effect.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #3705 from vanzin/SPARK-4834 and squashes the following commits:

b430534 [Marcelo Vanzin] Remove seemingly unnecessary synchronization.
50eb4b9 [Marcelo Vanzin] Review feedback.
c0e5ea5 [Marcelo Vanzin] [SPARK-4834] [standalone] Clean up application files after app finishes.

(cherry picked from commit dd15536)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
@asfgit asfgit closed this in dd15536 Dec 23, 2014
@vanzin vanzin deleted the SPARK-4834 branch December 23, 2014 20:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants