Skip to content

Commit a1f8779

Browse files
CodingCataarondav
authored andcommitted
SPARK-1104: kill Process in workerThread of ExecutorRunner
As reported in https://spark-project.atlassian.net/browse/SPARK-1104 By @pwendell: "Sometimes due to large shuffles executors will take a long time shutting down. In particular this can happen if large numbers of shuffle files are around (this will be alleviated by SPARK-1103, but nonetheless...). The symptom is you have DEAD workers sitting around in the UI and the existing workers keep trying to re-register but can't because they've been assumed dead." In this patch, I add lines in the handler of InterruptedException in workerThread of executorRunner, so that the process.destroy() and process.waitFor() can only block the workerThread instead of blocking the worker Actor... --------- analysis: process.destroy() is a blocking method, i.e. it only returns when all shutdownHook threads return...so calling it in Worker thread will make Worker block for a long while.... about what will happen on the shutdown hooks when the JVM process is killed: http://www.tutorialspoint.com/java/lang/runtime_addshutdownhook.htm Author: CodingCat <zhunansjtu@gmail.com> Closes #35 from CodingCat/SPARK-1104 and squashes the following commits: 85767da [CodingCat] add null checking and remove unnecessary killProce 3107aeb [CodingCat] address Aaron's comments eb615ba [CodingCat] kill the process when the error happens 0accf2f [CodingCat] set process to null after killed it 1d511c8 [CodingCat] kill Process in workerThread (cherry picked from commit f99af85) Signed-off-by: Aaron Davidson <aaron@databricks.com>
1 parent 2250c7a commit a1f8779

File tree

1 file changed

+14
-17
lines changed

1 file changed

+14
-17
lines changed

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -58,30 +58,29 @@ private[spark] class ExecutorRunner(
5858
override def run() { fetchAndRunExecutor() }
5959
}
6060
workerThread.start()
61-
6261
// Shutdown hook that kills actors on shutdown.
6362
shutdownHook = new Thread() {
6463
override def run() {
65-
if (process != null) {
66-
logInfo("Shutdown hook killing child process.")
67-
process.destroy()
68-
process.waitFor()
69-
}
64+
killProcess()
7065
}
7166
}
7267
Runtime.getRuntime.addShutdownHook(shutdownHook)
7368
}
7469

70+
private def killProcess() {
71+
if (process != null) {
72+
logInfo("Killing process!")
73+
process.destroy()
74+
process.waitFor()
75+
}
76+
}
77+
7578
/** Stop this executor runner, including killing the process it launched */
7679
def kill() {
7780
if (workerThread != null) {
81+
// the workerThread will kill the child process when interrupted
7882
workerThread.interrupt()
7983
workerThread = null
80-
if (process != null) {
81-
logInfo("Killing process!")
82-
process.destroy()
83-
process.waitFor()
84-
}
8584
state = ExecutorState.KILLED
8685
worker ! ExecutorStateChanged(appId, execId, state, None, None)
8786
Runtime.getRuntime.removeShutdownHook(shutdownHook)
@@ -128,7 +127,6 @@ private[spark] class ExecutorRunner(
128127
// parent process for the executor command
129128
env.put("SPARK_LAUNCH_WITH_SCALA", "0")
130129
process = builder.start()
131-
132130
val header = "Spark Executor Command: %s\n%s\n\n".format(
133131
command.mkString("\"", "\" \"", "\""), "=" * 40)
134132

@@ -148,14 +146,13 @@ private[spark] class ExecutorRunner(
148146
val message = "Command exited with code " + exitCode
149147
worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))
150148
} catch {
151-
case interrupted: InterruptedException =>
149+
case interrupted: InterruptedException => {
152150
logInfo("Runner thread for executor " + fullId + " interrupted")
153-
151+
killProcess()
152+
}
154153
case e: Exception => {
155154
logError("Error running executor", e)
156-
if (process != null) {
157-
process.destroy()
158-
}
155+
killProcess()
159156
state = ExecutorState.FAILED
160157
val message = e.getClass + ": " + e.getMessage
161158
worker ! ExecutorStateChanged(appId, execId, state, Some(message), None)

0 commit comments

Comments
 (0)