Skip to content

Commit e7039dc

Browse files
author
Marcelo Vanzin
committed
[SPARK-6014] [core] Revamp Spark shutdown hooks, fix shutdown races.
This change adds some new utility code to handle shutdown hooks in Spark. The main goal is to take advantage of Hadoop 2.x's API for shutdown hooks, which allows Spark to register a hook that will run before the one that cleans up HDFS clients, and thus avoids some races that would cause exceptions to show up and other issues such as failure to properly close event logs. Unfortunately, Hadoop 1.x does not have such APIs, so in that case correctness is still left to chance.
1 parent c84d916 commit e7039dc

File tree

9 files changed

+187
-115
lines changed

9 files changed

+187
-115
lines changed

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

+2-4
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf}
2727
import org.apache.spark.deploy.SparkHadoopUtil
2828
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
2929
import org.apache.spark.ui.JettyUtils._
30-
import org.apache.spark.util.SignalLogger
30+
import org.apache.spark.util.{SignalLogger, Utils}
3131

3232
/**
3333
* A web server that renders SparkUIs of completed applications.
@@ -194,9 +194,7 @@ object HistoryServer extends Logging {
194194
val server = new HistoryServer(conf, provider, securityManager, port)
195195
server.bind()
196196

197-
Runtime.getRuntime().addShutdownHook(new Thread("HistoryServerStopper") {
198-
override def run(): Unit = server.stop()
199-
})
197+
Utils.addShutdownHook { () => server.stop() }
200198

201199
// Wait until the end of the world... or if the HistoryServer process is manually stopped
202200
while(true) { Thread.sleep(Int.MaxValue) }

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

+4-8
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import com.google.common.io.Files
2828
import org.apache.spark.{SparkConf, Logging}
2929
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
3030
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
31+
import org.apache.spark.util.Utils
3132
import org.apache.spark.util.logging.FileAppender
3233

3334
/**
@@ -61,20 +62,15 @@ private[deploy] class ExecutorRunner(
6162

6263
// NOTE: This is now redundant with the automated shut-down enforced by the Executor. It might
6364
// make sense to remove this in the future.
64-
private var shutdownHook: Thread = null
65+
private var shutdownHook: AnyRef = null
6566

6667
private[worker] def start() {
6768
workerThread = new Thread("ExecutorRunner for " + fullId) {
6869
override def run() { fetchAndRunExecutor() }
6970
}
7071
workerThread.start()
7172
// Shutdown hook that kills actors on shutdown.
72-
shutdownHook = new Thread() {
73-
override def run() {
74-
killProcess(Some("Worker shutting down"))
75-
}
76-
}
77-
Runtime.getRuntime.addShutdownHook(shutdownHook)
73+
shutdownHook = Utils.addShutdownHook { () => killProcess(Some("Worker shutting down")) }
7874
}
7975

8076
/**
@@ -106,7 +102,7 @@ private[deploy] class ExecutorRunner(
106102
workerThread = null
107103
state = ExecutorState.KILLED
108104
try {
109-
Runtime.getRuntime.removeShutdownHook(shutdownHook)
105+
Utils.removeShutdownHook(shutdownHook)
110106
} catch {
111107
case e: IllegalStateException => None
112108
}

core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala

+5-13
Original file line numberDiff line numberDiff line change
@@ -138,25 +138,17 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
138138
}
139139
}
140140

141-
private def addShutdownHook(): Thread = {
142-
val shutdownHook = new Thread("delete Spark local dirs") {
143-
override def run(): Unit = Utils.logUncaughtExceptions {
144-
logDebug("Shutdown hook called")
145-
DiskBlockManager.this.doStop()
146-
}
141+
private def addShutdownHook(): AnyRef = {
142+
Utils.addShutdownHook { () =>
143+
logDebug("Shutdown hook called")
144+
DiskBlockManager.this.doStop()
147145
}
148-
Runtime.getRuntime.addShutdownHook(shutdownHook)
149-
shutdownHook
150146
}
151147

152148
/** Cleanup local dirs and stop shuffle sender. */
153149
private[spark] def stop() {
154150
// Remove the shutdown hook. It causes memory leaks if we leave it around.
155-
try {
156-
Runtime.getRuntime.removeShutdownHook(shutdownHook)
157-
} catch {
158-
case e: IllegalStateException => None
159-
}
151+
Utils.removeShutdownHook(shutdownHook)
160152
doStop()
161153
}
162154

core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala

+11-13
Original file line numberDiff line numberDiff line change
@@ -135,21 +135,19 @@ private[spark] class TachyonBlockManager(
135135

136136
private def addShutdownHook() {
137137
tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir))
138-
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark tachyon dirs") {
139-
override def run(): Unit = Utils.logUncaughtExceptions {
140-
logDebug("Shutdown hook called")
141-
tachyonDirs.foreach { tachyonDir =>
142-
try {
143-
if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
144-
Utils.deleteRecursively(tachyonDir, client)
145-
}
146-
} catch {
147-
case e: Exception =>
148-
logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
138+
Utils.addShutdownHook { () =>
139+
logDebug("Shutdown hook called")
140+
tachyonDirs.foreach { tachyonDir =>
141+
try {
142+
if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
143+
Utils.deleteRecursively(tachyonDir, client)
149144
}
145+
} catch {
146+
case e: Exception =>
147+
logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
150148
}
151-
client.close()
152149
}
153-
})
150+
client.close()
151+
}
154152
}
155153
}

core/src/main/scala/org/apache/spark/util/Utils.scala

+109-18
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io._
2121
import java.lang.management.ManagementFactory
2222
import java.net._
2323
import java.nio.ByteBuffer
24-
import java.util.{Properties, Locale, Random, UUID}
24+
import java.util.{PriorityQueue, Properties, Locale, Random, UUID}
2525
import java.util.concurrent._
2626
import javax.net.ssl.HttpsURLConnection
2727

@@ -30,7 +30,7 @@ import scala.collection.Map
3030
import scala.collection.mutable.ArrayBuffer
3131
import scala.io.Source
3232
import scala.reflect.ClassTag
33-
import scala.util.Try
33+
import scala.util.{Failure, Success, Try}
3434
import scala.util.control.{ControlThrowable, NonFatal}
3535

3636
import com.google.common.io.{ByteStreams, Files}
@@ -64,9 +64,15 @@ private[spark] object CallSite {
6464
private[spark] object Utils extends Logging {
6565
val random = new Random()
6666

67+
val DEFAULT_SHUTDOWN_PRIORITY = 100
68+
6769
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
6870
@volatile private var localRootDirs: Array[String] = null
6971

72+
73+
private val shutdownHooks = new SparkShutdownHookManager()
74+
shutdownHooks.install()
75+
7076
/** Serialize an object using Java serialization */
7177
def serialize[T](o: T): Array[Byte] = {
7278
val bos = new ByteArrayOutputStream()
@@ -176,18 +182,16 @@ private[spark] object Utils extends Logging {
176182
private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()
177183

178184
// Add a shutdown hook to delete the temp dirs when the JVM exits
179-
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dirs") {
180-
override def run(): Unit = Utils.logUncaughtExceptions {
181-
logDebug("Shutdown hook called")
182-
shutdownDeletePaths.foreach { dirPath =>
183-
try {
184-
Utils.deleteRecursively(new File(dirPath))
185-
} catch {
186-
case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e)
187-
}
185+
addShutdownHook { () =>
186+
logDebug("Shutdown hook called")
187+
shutdownDeletePaths.foreach { dirPath =>
188+
try {
189+
Utils.deleteRecursively(new File(dirPath))
190+
} catch {
191+
case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e)
188192
}
189193
}
190-
})
194+
}
191195

192196
// Register the path to be deleted via shutdown hook
193197
def registerShutdownDeleteDir(file: File) {
@@ -613,7 +617,7 @@ private[spark] object Utils extends Logging {
613617
}
614618
Utils.setupSecureURLConnection(uc, securityMgr)
615619

616-
val timeoutMs =
620+
val timeoutMs =
617621
conf.getTimeAsSeconds("spark.files.fetchTimeout", "60s").toInt * 1000
618622
uc.setConnectTimeout(timeoutMs)
619623
uc.setReadTimeout(timeoutMs)
@@ -1172,7 +1176,7 @@ private[spark] object Utils extends Logging {
11721176
/**
11731177
* Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the
11741178
* default UncaughtExceptionHandler
1175-
*
1179+
*
11761180
* NOTE: This method is to be called by the spark-started JVM process.
11771181
*/
11781182
def tryOrExit(block: => Unit) {
@@ -1185,11 +1189,11 @@ private[spark] object Utils extends Logging {
11851189
}
11861190

11871191
/**
1188-
* Execute a block of code that evaluates to Unit, stop SparkContext is there is any uncaught
1192+
* Execute a block of code that evaluates to Unit, stop SparkContext is there is any uncaught
11891193
* exception
1190-
*
1191-
* NOTE: This method is to be called by the driver-side components to avoid stopping the
1192-
* user-started JVM process completely; in contrast, tryOrExit is to be called in the
1194+
*
1195+
* NOTE: This method is to be called by the driver-side components to avoid stopping the
1196+
* user-started JVM process completely; in contrast, tryOrExit is to be called in the
11931197
* spark-started JVM process .
11941198
*/
11951199
def tryOrStopSparkContext(sc: SparkContext)(block: => Unit) {
@@ -2132,6 +2136,93 @@ private[spark] object Utils extends Logging {
21322136
.getOrElse(UserGroupInformation.getCurrentUser().getShortUserName())
21332137
}
21342138

2139+
/**
2140+
* Adds a shutdown hook with default priority.
2141+
*/
2142+
def addShutdownHook(hook: () => Unit): AnyRef = {
2143+
addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY, hook)
2144+
}
2145+
2146+
/**
2147+
* Adds a shutdown hook with the given priority. Hooks with lower priority values run
2148+
* first.
2149+
*/
2150+
def addShutdownHook(priority: Int, hook: () => Unit): AnyRef = {
2151+
shutdownHooks.add(priority, hook)
2152+
}
2153+
2154+
/**
2155+
* Remove a previously installed shutdown hook.
2156+
*/
2157+
def removeShutdownHook(ref: AnyRef): Boolean = {
2158+
shutdownHooks.remove(ref)
2159+
}
2160+
2161+
}
2162+
2163+
private [util] class SparkShutdownHookManager {
2164+
2165+
private val hooks = new PriorityQueue[SparkShutdownHook]()
2166+
private var shuttingDown = false
2167+
2168+
/**
2169+
* Install a hook to run at shutdown and run all registered hooks in order. Hadoop 1.x does not
2170+
* have `ShutdownHookManager`, so in that case we just use the JVM's `Runtime` object and hope for
2171+
* the best.
2172+
*/
2173+
def install(): Unit = {
2174+
val hookTask = new Runnable() {
2175+
override def run(): Unit = runAll()
2176+
}
2177+
Try(Class.forName("org.apache.hadoop.util.ShutdownHookManager")) match {
2178+
case Success(shmClass) =>
2179+
val fsPriority = classOf[FileSystem].getField("SHUTDOWN_HOOK_PRIORITY").get()
2180+
.asInstanceOf[Int]
2181+
val shm = shmClass.getMethod("get").invoke(null)
2182+
shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int])
2183+
.invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))
2184+
2185+
case Failure(_) =>
2186+
Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook"));
2187+
}
2188+
}
2189+
2190+
def runAll(): Unit = synchronized {
2191+
shuttingDown = true
2192+
while (!hooks.isEmpty()) {
2193+
Utils.logUncaughtExceptions(hooks.poll().run())
2194+
}
2195+
}
2196+
2197+
def add(priority: Int, hook: () => Unit): AnyRef = synchronized {
2198+
checkState()
2199+
val hookRef = new SparkShutdownHook(priority, hook)
2200+
hooks.add(hookRef)
2201+
hookRef
2202+
}
2203+
2204+
def remove(ref: AnyRef): Boolean = synchronized {
2205+
checkState()
2206+
hooks.remove(ref)
2207+
}
2208+
2209+
private def checkState(): Unit = {
2210+
if (shuttingDown) {
2211+
throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
2212+
}
2213+
}
2214+
2215+
}
2216+
2217+
private class SparkShutdownHook(private val priority: Int, hook: () => Unit)
2218+
extends Comparable[SparkShutdownHook] {
2219+
2220+
override def compareTo(other: SparkShutdownHook): Int = {
2221+
other.priority - priority
2222+
}
2223+
2224+
def run(): Unit = hook()
2225+
21352226
}
21362227

21372228
/**

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

+24-8
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@
1717

1818
package org.apache.spark.util
1919

20-
import scala.util.Random
21-
2220
import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
2321
import java.net.{BindException, ServerSocket, URI}
2422
import java.nio.{ByteBuffer, ByteOrder}
2523
import java.text.DecimalFormatSymbols
2624
import java.util.concurrent.TimeUnit
2725
import java.util.Locale
26+
import java.util.PriorityQueue
27+
28+
import scala.collection.mutable.ListBuffer
29+
import scala.util.Random
2830

2931
import com.google.common.base.Charsets.UTF_8
3032
import com.google.common.io.Files
@@ -36,14 +38,14 @@ import org.apache.hadoop.fs.Path
3638
import org.apache.spark.SparkConf
3739

3840
class UtilsSuite extends FunSuite with ResetSystemProperties {
39-
41+
4042
test("timeConversion") {
4143
// Test -1
4244
assert(Utils.timeStringAsSeconds("-1") === -1)
43-
45+
4446
// Test zero
4547
assert(Utils.timeStringAsSeconds("0") === 0)
46-
48+
4749
assert(Utils.timeStringAsSeconds("1") === 1)
4850
assert(Utils.timeStringAsSeconds("1s") === 1)
4951
assert(Utils.timeStringAsSeconds("1000ms") === 1)
@@ -52,7 +54,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
5254
assert(Utils.timeStringAsSeconds("1min") === TimeUnit.MINUTES.toSeconds(1))
5355
assert(Utils.timeStringAsSeconds("1h") === TimeUnit.HOURS.toSeconds(1))
5456
assert(Utils.timeStringAsSeconds("1d") === TimeUnit.DAYS.toSeconds(1))
55-
57+
5658
assert(Utils.timeStringAsMs("1") === 1)
5759
assert(Utils.timeStringAsMs("1ms") === 1)
5860
assert(Utils.timeStringAsMs("1000us") === 1)
@@ -61,7 +63,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
6163
assert(Utils.timeStringAsMs("1min") === TimeUnit.MINUTES.toMillis(1))
6264
assert(Utils.timeStringAsMs("1h") === TimeUnit.HOURS.toMillis(1))
6365
assert(Utils.timeStringAsMs("1d") === TimeUnit.DAYS.toMillis(1))
64-
66+
6567
// Test invalid strings
6668
intercept[NumberFormatException] {
6769
Utils.timeStringAsMs("This breaks 600s")
@@ -79,7 +81,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
7981
Utils.timeStringAsMs("This 123s breaks")
8082
}
8183
}
82-
84+
8385
test("bytesToString") {
8486
assert(Utils.bytesToString(10) === "10.0 B")
8587
assert(Utils.bytesToString(1500) === "1500.0 B")
@@ -466,4 +468,18 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
466468
val newFileName = new File(testFileDir, testFileName)
467469
assert(newFileName.isFile())
468470
}
471+
472+
test("shutdown hook manager") {
473+
val manager = new SparkShutdownHookManager()
474+
val output = new ListBuffer[Int]()
475+
476+
val hook1 = manager.add(1, () => output += 1)
477+
manager.add(3, () => output += 3)
478+
manager.add(2, () => output += 2)
479+
manager.add(4, () => output += 4)
480+
manager.remove(hook1)
481+
482+
manager.runAll()
483+
assert(output.toList === List(4, 3, 2))
484+
}
469485
}

0 commit comments

Comments
 (0)