Skip to content

[SPARK-12365][CORE] Use ShutdownHookManager where Runtime.getRuntime.addShutdownHook() is called #10325

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.network.sasl.SaslServerBootstrap
import org.apache.spark.network.server.{TransportServerBootstrap, TransportServer}
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
import org.apache.spark.network.util.TransportConf
import org.apache.spark.util.Utils
import org.apache.spark.util.{ShutdownHookManager, Utils}

/**
* Provides a server from which Executors can read shuffle files (rather than reading directly from
Expand Down Expand Up @@ -118,19 +118,13 @@ object ExternalShuffleService extends Logging {
server = newShuffleService(sparkConf, securityManager)
server.start()

installShutdownHook()
ShutdownHookManager.addShutdownHook { () =>
logInfo("Shutting down shuffle service.")
server.stop()
barrier.countDown()
}

// keep running until the process is terminated
barrier.await()
}

private def installShutdownHook(): Unit = {
Runtime.getRuntime.addShutdownHook(new Thread("External Shuffle Service shutdown thread") {
override def run() {
logInfo("Shutting down shuffle service.")
server.stop()
barrier.countDown()
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.CountDownLatch
import org.apache.spark.deploy.mesos.ui.MesosClusterUI
import org.apache.spark.deploy.rest.mesos.MesosRestServer
import org.apache.spark.scheduler.cluster.mesos._
import org.apache.spark.util.SignalLogger
import org.apache.spark.util.{ShutdownHookManager, SignalLogger}
import org.apache.spark.{Logging, SecurityManager, SparkConf}

/*
Expand Down Expand Up @@ -103,14 +103,11 @@ private[mesos] object MesosClusterDispatcher extends Logging {
}
val dispatcher = new MesosClusterDispatcher(dispatcherArgs, conf)
dispatcher.start()
val shutdownHook = new Thread() {
override def run() {
logInfo("Shutdown hook is shutting down dispatcher")
dispatcher.stop()
dispatcher.awaitShutdown()
}
ShutdownHookManager.addShutdownHook { () =>
logInfo("Shutdown hook is shutting down dispatcher")
dispatcher.stop()
dispatcher.awaitShutdown()
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
dispatcher.awaitShutdown()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,9 @@ private[spark] object ShutdownHookManager extends Logging {
val hook = new Thread {
override def run() {}
}
// scalastyle:off runtimeaddshutdownhook
Runtime.getRuntime.addShutdownHook(hook)
// scalastyle:on runtimeaddshutdownhook
Runtime.getRuntime.removeShutdownHook(hook)
} catch {
case ise: IllegalStateException => return true
Expand Down Expand Up @@ -228,7 +230,9 @@ private [util] class SparkShutdownHookManager {
.invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))

case Failure(_) =>
// scalastyle:off runtimeaddshutdownhook
Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook"));
// scalastyle:on runtimeaddshutdownhook
}
}

Expand Down
12 changes: 12 additions & 0 deletions scalastyle-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,18 @@ This file is divided into 3 sections:
]]></customMessage>
</check>

<check customId="runtimeaddshutdownhook" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
<parameters><parameter name="regex">Runtime\.getRuntime\.addShutdownHook</parameter></parameters>
<customMessage><![CDATA[
Are you sure that you want to use Runtime.getRuntime.addShutdownHook? In most cases, you should use
ShutdownHookManager.addShutdownHook instead.
If you must use Runtime.getRuntime.addShutdownHook, wrap the code block with
// scalastyle:off runtimeaddshutdownhook
Runtime.getRuntime.addShutdownHook(...)
// scalastyle:on runtimeaddshutdownhook
]]></customMessage>
</check>

<check customId="classforname" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
<parameters><parameter name="regex">Class\.forName</parameter></parameters>
<customMessage><![CDATA[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,20 +195,18 @@ private[hive] object SparkSQLCLIDriver extends Logging {
}

// add shutdown hook to flush the history to history file
Runtime.getRuntime.addShutdownHook(new Thread(new Runnable() {
override def run() = {
reader.getHistory match {
case h: FileHistory =>
try {
h.flush()
} catch {
case e: IOException =>
logWarning("WARNING: Failed to write command history file: " + e.getMessage)
}
case _ =>
}
ShutdownHookManager.addShutdownHook { () =>
reader.getHistory match {
case h: FileHistory =>
try {
h.flush()
} catch {
case e: IOException =>
logWarning("WARNING: Failed to write command history file: " + e.getMessage)
}
case _ =>
}
}))
}

// TODO: missing
/*
Expand Down