Skip to content

Commit 120ab9d

Browse files
author
Andrew Or
committed
Support kill and request driver status through SparkSubmit
1 parent 544de1d commit 120ab9d

File tree

2 files changed

+88
-7
lines changed

2 files changed

+88
-7
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,15 @@ import org.apache.spark.executor.ExecutorURLClassLoader
2727
import org.apache.spark.util.Utils
2828
import org.apache.spark.deploy.rest.StandaloneRestClient
2929

30+
/**
31+
* Whether to submit, kill, or request the status of an application.
32+
* The latter two operations are currently supported only for standalone cluster mode.
33+
*/
34+
private[spark] object Action extends Enumeration {
35+
type Action = Value
36+
val SUBMIT, KILL, REQUEST_STATUS = Value
37+
}
38+
3039
/**
3140
* Main gateway of launching a Spark application.
3241
*
@@ -73,11 +82,30 @@ object SparkSubmit {
7382
if (appArgs.verbose) {
7483
printStream.println(appArgs)
7584
}
76-
launch(appArgs)
85+
appArgs.action match {
86+
case Action.SUBMIT => submit(appArgs)
87+
case Action.KILL => kill(appArgs)
88+
case Action.REQUEST_STATUS => requestStatus(appArgs)
89+
}
90+
}
91+
92+
/**
93+
* Kill an existing driver using the stable REST protocol. Standalone cluster mode only.
94+
*/
95+
private[spark] def kill(args: SparkSubmitArguments): Unit = {
96+
new StandaloneRestClient().killDriver(args.master, args.driverToKill)
97+
}
98+
99+
/**
100+
* Request the status of an existing driver using the stable REST protocol.
101+
* Standalone cluster mode only.
102+
*/
103+
private[spark] def requestStatus(args: SparkSubmitArguments): Unit = {
104+
new StandaloneRestClient().requestDriverStatus(args.master, args.driverToRequestStatusFor)
77105
}
78106

79107
/**
80-
* Launch the application using the provided parameters.
108+
* Submit the application using the provided parameters.
81109
*
82110
* This runs in two steps. First, we prepare the launch environment by setting up
83111
* the appropriate classpath, system properties, and application arguments for
@@ -89,7 +117,7 @@ object SparkSubmit {
89117
* main method of a child class. Instead, we pass the submit parameters directly to
90118
* a REST client, which will submit the application using the stable REST protocol.
91119
*/
92-
private[spark] def launch(args: SparkSubmitArguments): Unit = {
120+
private[spark] def submit(args: SparkSubmitArguments): Unit = {
93121
// Environment needed to launch the child main class
94122
val childArgs = new ArrayBuffer[String]()
95123
val childClasspath = new ArrayBuffer[String]()

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.util.jar.JarFile
2323
import scala.collection.mutable.{ArrayBuffer, HashMap}
2424

2525
import org.apache.spark.util.Utils
26+
import org.apache.spark.deploy.Action.Action
2627

2728
/**
2829
* Parses and encapsulates arguments from the spark-submit script.
@@ -39,8 +40,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
3940
var driverExtraClassPath: String = null
4041
var driverExtraLibraryPath: String = null
4142
var driverExtraJavaOptions: String = null
42-
var driverCores: String = null
43-
var supervise: Boolean = false
4443
var queue: String = null
4544
var numExecutors: String = null
4645
var files: String = null
@@ -55,6 +54,23 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
5554
var pyFiles: String = null
5655
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
5756

57+
// Standalone cluster mode only
58+
var supervise: Boolean = false
59+
var driverCores: String = null
60+
var driverToKill: String = null
61+
var driverToRequestStatusFor: String = null
62+
63+
def action: Action = {
64+
(driverToKill, driverToRequestStatusFor) match {
65+
case (null, null) => Action.SUBMIT
66+
case (_, null) => Action.KILL
67+
case (null, _) => Action.REQUEST_STATUS
68+
case _ => SparkSubmit.printErrorAndExit(
69+
"Requested to both kill and request status for a driver. Choose only one.")
70+
null // never reached
71+
}
72+
}
73+
5874
/** Default properties present in the currently defined defaults file. */
5975
lazy val defaultSparkProperties: HashMap[String, String] = {
6076
val defaultProperties = new HashMap[String, String]()
@@ -79,7 +95,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
7995
// Use `sparkProperties` map along with env vars to fill in any missing parameters
8096
loadEnvironmentArguments()
8197

82-
checkRequiredArguments()
98+
validateArguments()
8399

84100
/**
85101
* Merge values from the default properties file with those specified through --conf.
@@ -171,7 +187,15 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
171187
}
172188

173189
/** Ensure that required fields exists. Call this only once all defaults are loaded. */
174-
private def checkRequiredArguments(): Unit = {
190+
private def validateArguments(): Unit = {
191+
action match {
192+
case Action.SUBMIT => validateSubmitArguments()
193+
case Action.KILL => validateKillArguments()
194+
case Action.REQUEST_STATUS => validateStatusRequestArguments()
195+
}
196+
}
197+
198+
private def validateSubmitArguments(): Unit = {
175199
if (args.length == 0) {
176200
printUsageAndExit(-1)
177201
}
@@ -206,6 +230,25 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
206230
}
207231
}
208232

233+
private def validateKillArguments(): Unit = {
234+
if (!master.startsWith("spark://") || deployMode != "cluster") {
235+
SparkSubmit.printErrorAndExit("Killing drivers is only supported in standalone cluster mode")
236+
}
237+
if (driverToKill == null) {
238+
SparkSubmit.printErrorAndExit("Please specify a driver to kill")
239+
}
240+
}
241+
242+
private def validateStatusRequestArguments(): Unit = {
243+
if (!master.startsWith("spark://") || deployMode != "cluster") {
244+
SparkSubmit.printErrorAndExit(
245+
"Requesting driver statuses is only supported in standalone cluster mode")
246+
}
247+
if (driverToRequestStatusFor == null) {
248+
SparkSubmit.printErrorAndExit("Please specify a driver to request status for")
249+
}
250+
}
251+
209252
override def toString = {
210253
s"""Parsed arguments:
211254
| master $master
@@ -312,6 +355,14 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
312355
propertiesFile = value
313356
parse(tail)
314357

358+
case ("--kill") :: value :: tail =>
359+
driverToKill = value
360+
parse(tail)
361+
362+
case ("--status") :: value :: tail =>
363+
driverToRequestStatusFor = value
364+
parse(tail)
365+
315366
case ("--supervise") :: tail =>
316367
supervise = true
317368
parse(tail)
@@ -410,6 +461,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
410461
| Spark standalone with cluster deploy mode only:
411462
| --driver-cores NUM Cores for driver (Default: 1).
412463
| --supervise If given, restarts the driver on failure.
464+
| --kill DRIVER_ID If given, kills the driver specified.
465+
| --status DRIVER_ID If given, requests the status of the driver specified.
413466
|
414467
| Spark standalone and Mesos only:
415468
| --total-executor-cores NUM Total cores for all executors.

0 commit comments

Comments
 (0)