Skip to content

Commit 2aecda2

Browse files
committed
[SPARK-12281][CORE] Fix a race condition when reporting ExecutorState in the shutdown hook
1. Make sure workers and masters exit so that no worker or master will still be running when triggering the shutdown hook. 2. Set ExecutorState to FAILED if it's still RUNNING when executing the shutdown hook. This should fix the potential exceptions when exiting a local cluster ``` java.lang.AssertionError: assertion failed: executor 4 state transfer from RUNNING to RUNNING is illegal at scala.Predef$.assert(Predef.scala:179) at org.apache.spark.deploy.master.Master$$anonfun$receive$1.applyOrElse(Master.scala:260) at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) java.lang.IllegalStateException: Shutdown hooks cannot be modified during shutdown. at org.apache.spark.util.SparkShutdownHookManager.add(ShutdownHookManager.scala:246) at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:191) at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:180) at org.apache.spark.deploy.worker.ExecutorRunner.start(ExecutorRunner.scala:73) at org.apache.spark.deploy.worker.Worker$$anonfun$receive$1.applyOrElse(Worker.scala:474) at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` Author: Shixiong Zhu <shixiong@databricks.com> Closes #10269 from zsxwing/executor-state.
1 parent 8af2f8c commit 2aecda2

File tree

3 files changed

+9
-3
lines changed

3 files changed

+9
-3
lines changed

core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ class LocalSparkCluster(
7575
// Stop the workers before the master so they don't get upset that it disconnected
7676
workerRpcEnvs.foreach(_.shutdown())
7777
masterRpcEnvs.foreach(_.shutdown())
78+
workerRpcEnvs.foreach(_.awaitTermination())
79+
masterRpcEnvs.foreach(_.awaitTermination())
7880
masterRpcEnvs.clear()
7981
workerRpcEnvs.clear()
8082
}

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -257,9 +257,8 @@ private[deploy] class Master(
257257
exec.state = state
258258

259259
if (state == ExecutorState.RUNNING) {
260-
if (oldState != ExecutorState.LAUNCHING) {
261-
logWarning(s"Executor $execId state transfer from $oldState to RUNNING is unexpected")
262-
}
260+
assert(oldState == ExecutorState.LAUNCHING,
261+
s"executor $execId state transfer from $oldState to RUNNING is illegal")
263262
appInfo.resetRetryCount()
264263
}
265264

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@ private[deploy] class ExecutorRunner(
7171
workerThread.start()
7272
// Shutdown hook that kills actors on shutdown.
7373
shutdownHook = ShutdownHookManager.addShutdownHook { () =>
74+
// It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
75+
// be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`.
76+
if (state == ExecutorState.RUNNING) {
77+
state = ExecutorState.FAILED
78+
}
7479
killProcess(Some("Worker shutting down")) }
7580
}
7681

0 commit comments

Comments
 (0)