Skip to content

Commit c0e5ea5

Browse files
author
Marcelo Vanzin
committed
[SPARK-4834] [standalone] Clean up application files after app finishes.
Commit 7aacb7b added support for sharing downloaded files among multiple executors of the same app. That works great in Yarn, since the app's directory is cleaned up after the app is done. But Spark standalone mode didn't do that, so the lock/cache files created by that change were left around and could eventually fill up the disk hosting /tmp. To solve that, create app-specific directories under the local dirs when launching executors. Multiple executors launched by the same Worker will use the same app directories, so they should be able to share the downloaded files. When the application finishes, a new message is sent to all executors telling them the application has finished; once that message has been received, and all executors registered for the application shut down, then those directories will be cleaned up by the Worker. Note 1: Unit testing this is hard (if even possible), since local-cluster mode doesn't seem to leave the Master/Worker daemons running long enough after `sc.stop()` is called for the clean up protocol to take effect. Note 2: the code tracking finished apps / app directories in Master.scala and Worker.scala is not really thread-safe, but then the code that modifies other shared maps in those classes isn't either, so this change is not making anything worse.
1 parent ef84dab commit c0e5ea5

File tree

7 files changed

+64
-9
lines changed

7 files changed

+64
-9
lines changed

core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,4 +175,9 @@ private[deploy] object DeployMessages {
175175
// Liveness checks in various places
176176

177177
case object SendHeartbeat
178+
179+
// Application finished message, used for cleanup
180+
181+
case class ApplicationFinished(id: String)
182+
178183
}

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ private[spark] class Master(
510510
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
511511
val numWorkersAlive = shuffledAliveWorkers.size
512512
var curPos = 0
513-
513+
514514
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
515515
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
516516
// start from the last worker that was assigned a driver, and continue onwards until we have
@@ -697,6 +697,11 @@ private[spark] class Master(
697697
}
698698
persistenceEngine.removeApplication(app)
699699
schedule()
700+
701+
// Tell all workers that the application has finished, so they can clean up any app state.
702+
workers.foreach { w =>
703+
w.actor ! ApplicationFinished(app.id)
704+
}
700705
}
701706
}
702707

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ private[spark] class ExecutorRunner(
4747
val executorDir: File,
4848
val workerUrl: String,
4949
val conf: SparkConf,
50+
val appLocalDirs: Seq[String],
5051
var state: ExecutorState.Value)
5152
extends Logging {
5253

@@ -77,7 +78,7 @@ private[spark] class ExecutorRunner(
7778
/**
7879
* Kill executor process, wait for exit and notify worker to update resource status.
7980
*
80-
* @param message the exception message which caused the executor's death
81+
* @param message the exception message which caused the executor's death
8182
*/
8283
private def killProcess(message: Option[String]) {
8384
var exitCode: Option[Int] = None
@@ -129,6 +130,7 @@ private[spark] class ExecutorRunner(
129130
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
130131

131132
builder.directory(executorDir)
133+
builder.environment.put("SPARK_LOCAL_DIRS", appLocalDirs.mkString(","))
132134
// In case we are running this from within the Spark Shell, avoid creating a "scala"
133135
// parent process for the executor command
134136
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.text.SimpleDateFormat
2323
import java.util.{UUID, Date}
2424

2525
import scala.collection.JavaConversions._
26-
import scala.collection.mutable.HashMap
26+
import scala.collection.mutable.{HashMap, HashSet}
2727
import scala.concurrent.duration._
2828
import scala.language.postfixOps
2929
import scala.util.Random
@@ -109,6 +109,8 @@ private[spark] class Worker(
109109
val finishedExecutors = new HashMap[String, ExecutorRunner]
110110
val drivers = new HashMap[String, DriverRunner]
111111
val finishedDrivers = new HashMap[String, DriverRunner]
112+
val appDirectories = new HashMap[String, Seq[String]]
113+
val finishedApps = new HashSet[String]
112114

113115
// The shuffle service is not actually started unless configured.
114116
val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr)
@@ -292,7 +294,7 @@ private[spark] class Worker(
292294
val isAppStillRunning = executors.values.map(_.appId).contains(appIdFromDir)
293295
dir.isDirectory && !isAppStillRunning &&
294296
!Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECS)
295-
}.foreach { dir =>
297+
}.foreach { dir =>
296298
logInfo(s"Removing directory: ${dir.getPath}")
297299
Utils.deleteRecursively(dir)
298300
}
@@ -337,8 +339,19 @@ private[spark] class Worker(
337339
throw new IOException("Failed to create directory " + executorDir)
338340
}
339341

342+
// Create local dirs for the executor. These are passed to the executor via the
343+
// SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the
344+
// application finishes.
345+
val appLocalDirs = appDirectories.get(appId).getOrElse {
346+
Utils.getOrCreateLocalRootDirs(conf).map { dir =>
347+
Utils.createDirectory(dir).getAbsolutePath()
348+
}.toSeq
349+
}
350+
appDirectories(appId) = appLocalDirs
351+
340352
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
341-
self, workerId, host, sparkHome, executorDir, akkaUrl, conf, ExecutorState.LOADING)
353+
self, workerId, host, sparkHome, executorDir, akkaUrl, conf, appLocalDirs,
354+
ExecutorState.LOADING)
342355
executors(appId + "/" + execId) = manager
343356
manager.start()
344357
coresUsed += cores_
@@ -375,6 +388,7 @@ private[spark] class Worker(
375388
message.map(" message " + _).getOrElse("") +
376389
exitStatus.map(" exitStatus " + _).getOrElse(""))
377390
}
391+
maybeCleanupApplication(appId)
378392
}
379393

380394
case KillExecutor(masterUrl, appId, execId) =>
@@ -444,6 +458,9 @@ private[spark] class Worker(
444458
case ReregisterWithMaster =>
445459
reregisterWithMaster()
446460

461+
case ApplicationFinished(id) =>
462+
finishedApps += id
463+
maybeCleanupApplication(id)
447464
}
448465

449466
private def masterDisconnected() {
@@ -452,6 +469,19 @@ private[spark] class Worker(
452469
registerWithMaster()
453470
}
454471

472+
private def maybeCleanupApplication(id: String): Unit = synchronized {
473+
val shouldCleanup = finishedApps.contains(id) && !executors.values.exists(_.appId == id)
474+
if (shouldCleanup) {
475+
finishedApps -= id
476+
appDirectories.remove(id).foreach {
477+
logInfo(s"Cleaning up local directories for application $id")
478+
_.foreach { dir =>
479+
Utils.deleteRecursively(new File(dir))
480+
}
481+
}
482+
}
483+
}
484+
455485
def generateWorkerId(): String = {
456486
"worker-%s-%s-%d".format(createDateFormat.format(new Date), host, port)
457487
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,11 @@ private[spark] object Utils extends Logging {
246246
retval
247247
}
248248

249-
/** Create a temporary directory inside the given parent directory */
250-
def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = {
249+
/**
250+
* Create a directory inside the given parent directory. The directory is guaranteed to be
251+
* newly created, and is not marked for automatic deletion.
252+
*/
253+
def createDirectory(root: String): File = {
251254
var attempts = 0
252255
val maxAttempts = 10
253256
var dir: File = null
@@ -265,6 +268,15 @@ private[spark] object Utils extends Logging {
265268
} catch { case e: SecurityException => dir = null; }
266269
}
267270

271+
dir
272+
}
273+
274+
/**
275+
* Create a temporary directory inside the given parent directory. The directory will be
276+
* automatically deleted when the VM shuts down.
277+
*/
278+
def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = {
279+
val dir = createDirectory(root)
268280
registerShutdownDeleteDir(dir)
269281
dir
270282
}

core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ class JsonProtocolSuite extends FunSuite {
119119
def createExecutorRunner(): ExecutorRunner = {
120120
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
121121
new File("sparkHome"), new File("workDir"), "akka://worker",
122-
new SparkConf, ExecutorState.RUNNING)
122+
new SparkConf, Seq("localDir"), ExecutorState.RUNNING)
123123
}
124124

125125
def createDriverRunner(): DriverRunner = {

core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ class ExecutorRunnerTest extends FunSuite {
3333
val appDesc = new ApplicationDescription("app name", Some(8), 500,
3434
Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
3535
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321",
36-
new File(sparkHome), new File("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)
36+
new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
37+
ExecutorState.RUNNING)
3738
val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables)
3839
assert(builder.command().last === appId)
3940
}

0 commit comments

Comments
 (0)