Skip to content

[SPARK-4834] [standalone] Clean up application files after app finishes. #3705

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ private[deploy] object DeployMessages {

case class KillDriver(driverId: String) extends DeployMessage

case class ApplicationFinished(id: String)

// Worker internal

case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders
Expand Down Expand Up @@ -175,4 +177,5 @@ private[deploy] object DeployMessages {
// Liveness checks in various places

case object SendHeartbeat

}
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ private[spark] class Master(
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0

for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
Expand Down Expand Up @@ -697,6 +697,11 @@ private[spark] class Master(
}
persistenceEngine.removeApplication(app)
schedule()

// Tell all workers that the application has finished, so they can clean up any app state.
workers.foreach { w =>
w.actor ! ApplicationFinished(app.id)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ private[spark] class ExecutorRunner(
val executorDir: File,
val workerUrl: String,
val conf: SparkConf,
val appLocalDirs: Seq[String],
var state: ExecutorState.Value)
extends Logging {

Expand Down Expand Up @@ -77,7 +78,7 @@ private[spark] class ExecutorRunner(
/**
* Kill executor process, wait for exit and notify worker to update resource status.
*
* @param message the exception message which caused the executor's death
* @param message the exception message which caused the executor's death
*/
private def killProcess(message: Option[String]) {
var exitCode: Option[Int] = None
Expand Down Expand Up @@ -129,6 +130,7 @@ private[spark] class ExecutorRunner(
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))

builder.directory(executorDir)
builder.environment.put("SPARK_LOCAL_DIRS", appLocalDirs.mkString(","))
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
Expand Down
36 changes: 33 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.text.SimpleDateFormat
import java.util.{UUID, Date}

import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import scala.collection.mutable.{HashMap, HashSet}
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random
Expand Down Expand Up @@ -109,6 +109,8 @@ private[spark] class Worker(
val finishedExecutors = new HashMap[String, ExecutorRunner]
val drivers = new HashMap[String, DriverRunner]
val finishedDrivers = new HashMap[String, DriverRunner]
val appDirectories = new HashMap[String, Seq[String]]
val finishedApps = new HashSet[String]

// The shuffle service is not actually started unless configured.
val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr)
Expand Down Expand Up @@ -292,7 +294,7 @@ private[spark] class Worker(
val isAppStillRunning = executors.values.map(_.appId).contains(appIdFromDir)
dir.isDirectory && !isAppStillRunning &&
!Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECS)
}.foreach { dir =>
}.foreach { dir =>
logInfo(s"Removing directory: ${dir.getPath}")
Utils.deleteRecursively(dir)
}
Expand Down Expand Up @@ -337,8 +339,19 @@ private[spark] class Worker(
throw new IOException("Failed to create directory " + executorDir)
}

// Create local dirs for the executor. These are passed to the executor via the
// SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the
// application finishes.
val appLocalDirs = appDirectories.get(appId).getOrElse {
Utils.getOrCreateLocalRootDirs(conf).map { dir =>
Utils.createDirectory(dir).getAbsolutePath()
}.toSeq
}
appDirectories(appId) = appLocalDirs

val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
self, workerId, host, sparkHome, executorDir, akkaUrl, conf, ExecutorState.LOADING)
self, workerId, host, sparkHome, executorDir, akkaUrl, conf, appLocalDirs,
ExecutorState.LOADING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
Expand Down Expand Up @@ -375,6 +388,7 @@ private[spark] class Worker(
message.map(" message " + _).getOrElse("") +
exitStatus.map(" exitStatus " + _).getOrElse(""))
}
maybeCleanupApplication(appId)
}

case KillExecutor(masterUrl, appId, execId) =>
Expand Down Expand Up @@ -444,6 +458,9 @@ private[spark] class Worker(
case ReregisterWithMaster =>
reregisterWithMaster()

case ApplicationFinished(id) =>
finishedApps += id
maybeCleanupApplication(id)
}

private def masterDisconnected() {
Expand All @@ -452,6 +469,19 @@ private[spark] class Worker(
registerWithMaster()
}

private def maybeCleanupApplication(id: String): Unit = {
val shouldCleanup = finishedApps.contains(id) && !executors.values.exists(_.appId == id)
if (shouldCleanup) {
finishedApps -= id
appDirectories.remove(id).foreach { dirList =>
logInfo(s"Cleaning up local directories for application $id")
dirList.foreach { dir =>
Utils.deleteRecursively(new File(dir))
}
}
}
}

def generateWorkerId(): String = {
"worker-%s-%s-%d".format(createDateFormat.format(new Date), host, port)
}
Expand Down
16 changes: 14 additions & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,11 @@ private[spark] object Utils extends Logging {
retval
}

/** Create a temporary directory inside the given parent directory */
def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = {
/**
* Create a directory inside the given parent directory. The directory is guaranteed to be
* newly created, and is not marked for automatic deletion.
*/
def createDirectory(root: String): File = {
var attempts = 0
val maxAttempts = 10
var dir: File = null
Expand All @@ -265,6 +268,15 @@ private[spark] object Utils extends Logging {
} catch { case e: SecurityException => dir = null; }
}

dir
}

/**
* Create a temporary directory inside the given parent directory. The directory will be
* automatically deleted when the VM shuts down.
*/
def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = {
val dir = createDirectory(root)
registerShutdownDeleteDir(dir)
dir
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class JsonProtocolSuite extends FunSuite {
def createExecutorRunner(): ExecutorRunner = {
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
new File("sparkHome"), new File("workDir"), "akka://worker",
new SparkConf, ExecutorState.RUNNING)
new SparkConf, Seq("localDir"), ExecutorState.RUNNING)
}

def createDriverRunner(): DriverRunner = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class ExecutorRunnerTest extends FunSuite {
val appDesc = new ApplicationDescription("app name", Some(8), 500,
Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321",
new File(sparkHome), new File("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)
new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
ExecutorState.RUNNING)
val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables)
assert(builder.command().last === appId)
}
Expand Down