Skip to content

Commit 40e6095

Browse files
author
Andrew Or
committed
Pass submit parameters through system properties
This decouples SparkSubmitArgument from StandaloneRestClient such that the latter can now be launched as a main class just like any other invocations of Spark submit.
1 parent cbd670b commit 40e6095

File tree

9 files changed

+171
-214
lines changed

9 files changed

+171
-214
lines changed

core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ private[spark] class ClientArguments(args: Array[String]) {
4949

5050
parse(args.toList)
5151

52-
def parse(args: List[String]): Unit = args match {
52+
private def parse(args: List[String]): Unit = args match {
5353
case ("--cores" | "-c") :: IntParam(value) :: tail =>
5454
cores = value
5555
parse(tail)

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 30 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -138,28 +138,22 @@ object SparkSubmit {
138138
*/
139139
private[spark] def submit(args: SparkSubmitArguments): Unit = {
140140
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
141-
/*
142-
* In standalone cluster mode, there are two submission gateways:
143-
* (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper
144-
* (2) The new REST-based gateway introduced in Spark 1.3
145-
* The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
146-
* to use the legacy gateway if the master endpoint turns out to be not a REST server.
147-
*/
148-
if (args.isStandaloneCluster) {
141+
// In standalone cluster mode, there are two submission gateways:
142+
// (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper
143+
// (2) The new REST-based gateway introduced in Spark 1.3
144+
// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
145+
// to use the legacy gateway if the master endpoint turns out to be not a REST server.
146+
if (args.isStandaloneCluster && args.useRest) {
149147
try {
150148
printStream.println("Running Spark using the REST application submission protocol.")
151-
val client = new StandaloneRestClient
152-
val response = client.createSubmission(args)
153-
response match {
154-
case s: CreateSubmissionResponse => handleRestResponse(s)
155-
case r => handleUnexpectedRestResponse(r)
156-
}
149+
runMain(childArgs, childClasspath, sysProps, childMainClass)
157150
} catch {
158151
// Fail over to use the legacy submission gateway
159152
case e: SubmitRestConnectionException =>
160153
printStream.println(s"Master endpoint ${args.master} was not a " +
161154
s"REST server. Falling back to legacy submission gateway instead.")
162-
runMain(childArgs, childClasspath, sysProps, childMainClass)
155+
args.useRest = false
156+
submit(args)
163157
}
164158
// In all other modes, just run the main class as prepared
165159
} else {
@@ -172,9 +166,8 @@ object SparkSubmit {
172166
* This returns a 4-tuple:
173167
* (1) the arguments for the child process,
174168
* (2) a list of classpath entries for the child,
175-
* (3) a list of system properties and env vars, and
169+
* (3) a map of system properties, and
176170
* (4) the main class for the child
177-
* In standalone cluster mode, this mutates the original arguments passed in.
178171
* Exposed for testing.
179172
*/
180173
private[spark] def prepareSubmitEnvironment(args: SparkSubmitArguments)
@@ -319,8 +312,12 @@ object SparkSubmit {
319312
// Standalone cluster only
320313
OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
321314
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),
322-
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
323-
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"),
315+
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, sysProp = "spark.driver.memory"),
316+
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, sysProp = "spark.driver.cores"),
317+
OptionAssigner(args.primaryResource, STANDALONE, CLUSTER, sysProp = "spark.app.resource"),
318+
OptionAssigner(args.mainClass, STANDALONE, CLUSTER, sysProp = "spark.app.mainClass"),
319+
OptionAssigner(args.supervise.toString, STANDALONE, CLUSTER,
320+
sysProp = "spark.driver.supervise"),
324321

325322
// Yarn client only
326323
OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
@@ -382,15 +379,22 @@ object SparkSubmit {
382379
sysProps.put("spark.jars", jars.mkString(","))
383380
}
384381

385-
// In standalone-cluster mode, use Client as a wrapper around the user class
386-
// Note that we won't actually launch this class if we're using the REST protocol
382+
// In standalone cluster mode, use the REST client to submit the application (Spark 1.3+).
383+
// All parameters except application arguments are expected to be passed to the main class
384+
// through system properties.
387385
if (args.isStandaloneCluster) {
388-
childMainClass = "org.apache.spark.deploy.Client"
389-
if (args.supervise) {
390-
childArgs += "--supervise"
386+
if (args.useRest) {
387+
childMainClass = "org.apache.spark.deploy.rest.StandaloneRestClient"
388+
} else {
389+
// In legacy standalone cluster mode, use Client as a wrapper around the user class
390+
childMainClass = "org.apache.spark.deploy.Client"
391+
if (args.supervise) { childArgs += "--supervise" }
392+
Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
393+
Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
394+
childArgs += "launch"
395+
childArgs += (args.master, args.primaryResource, args.mainClass)
391396
}
392-
childArgs += "launch"
393-
childArgs += (args.master, args.primaryResource, args.mainClass)
397+
// Whether or not we use REST, pass application arguments through the command line
394398
if (args.childArgs != null) {
395399
childArgs ++= args.childArgs
396400
}
@@ -453,16 +457,6 @@ object SparkSubmit {
453457
sysProps("spark.submit.pyFiles") = formattedPyFiles
454458
}
455459

456-
// NOTE: If we are using the REST gateway, we will use the original arguments directly.
457-
// Since we mutate the values of some configs in this method, we must update the
458-
// corresponding fields in the original SparkSubmitArguments to reflect these changes.
459-
if (args.isStandaloneCluster) {
460-
args.sparkProperties.clear()
461-
args.sparkProperties ++= sysProps
462-
sysProps.get("spark.jars").foreach { args.jars = _ }
463-
sysProps.get("spark.files").foreach { args.files = _ }
464-
}
465-
466460
(childArgs, childClasspath, sysProps, childMainClass)
467461
}
468462

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
5959
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
6060

6161
// Standalone cluster mode only
62+
var useRest: Boolean = true
6263
var supervise: Boolean = false
6364
var driverCores: String = null
6465
var driverToKill: String = null

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala

Lines changed: 32 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@ import scala.io.Source
2424

2525
import com.google.common.base.Charsets
2626

27-
import org.apache.spark.{Logging, SPARK_VERSION => sparkVersion}
28-
import org.apache.spark.deploy.SparkSubmitArguments
27+
import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
2928

3029
/**
3130
* A client that submits applications to the standalone Master using a REST protocol.
@@ -55,12 +54,15 @@ private[spark] class StandaloneRestClient extends Logging {
5554
* If the submission was successful, poll the status of the submission and report
5655
* it to the user. Otherwise, report the error message provided by the server.
5756
*/
58-
def createSubmission(args: SparkSubmitArguments): SubmitRestProtocolResponse = {
59-
logInfo(s"Submitting a request to launch a driver in ${args.master}.")
60-
validateSubmitArgs(args)
61-
val master = args.master
57+
def createSubmission(
58+
master: String,
59+
appArgs: Array[String],
60+
sparkProperties: Map[String, String],
61+
environmentVariables: Map[String, String]): SubmitRestProtocolResponse = {
62+
logInfo(s"Submitting a request to launch a driver in $master.")
63+
validateMaster(master)
6264
val url = getSubmitUrl(master)
63-
val request = constructSubmitRequest(args)
65+
val request = constructSubmitRequest(appArgs, sparkProperties, environmentVariables)
6466
val response = postJson(url, request.toJson)
6567
response match {
6668
case s: CreateSubmissionResponse => reportSubmissionStatus(master, s)
@@ -182,36 +184,16 @@ private[spark] class StandaloneRestClient extends Logging {
182184
}
183185
}
184186

185-
/** Throw an exception if this is not standalone cluster mode. */
186-
private def validateSubmitArgs(args: SparkSubmitArguments): Unit = {
187-
if (!args.isStandaloneCluster) {
188-
throw new IllegalArgumentException(
189-
"This REST client is only supported in standalone cluster mode.")
190-
}
191-
}
192-
193187
/** Construct a message that captures the specified parameters for submitting an application. */
194-
private def constructSubmitRequest(args: SparkSubmitArguments): CreateSubmissionRequest = {
188+
def constructSubmitRequest(
189+
appArgs: Array[String],
190+
sparkProperties: Map[String, String],
191+
environmentVariables: Map[String, String]): CreateSubmissionRequest = {
195192
val message = new CreateSubmissionRequest
196193
message.clientSparkVersion = sparkVersion
197-
message.appName = args.name
198-
message.appResource = args.primaryResource
199-
message.mainClass = args.mainClass
200-
message.jars = args.jars
201-
message.files = args.files
202-
message.driverMemory = args.driverMemory
203-
message.driverCores = args.driverCores
204-
message.driverExtraJavaOptions = args.driverExtraJavaOptions
205-
message.driverExtraClassPath = args.driverExtraClassPath
206-
message.driverExtraLibraryPath = args.driverExtraLibraryPath
207-
message.superviseDriver = args.supervise.toString
208-
message.executorMemory = args.executorMemory
209-
message.totalExecutorCores = args.totalExecutorCores
210-
args.childArgs.foreach(message.addAppArg)
211-
args.sparkProperties.foreach { case (k, v) => message.setSparkProperty(k, v) }
212-
sys.env.foreach { case (k, v) =>
213-
if (k.startsWith("SPARK_")) { message.setEnvironmentVariable(k, v) }
214-
}
194+
message.appArgs = appArgs
195+
message.sparkProperties = sparkProperties
196+
message.environmentVariables = environmentVariables
215197
message.validate()
216198
message
217199
}
@@ -273,8 +255,23 @@ private[spark] class StandaloneRestClient extends Logging {
273255
}
274256
}
275257

276-
private object StandaloneRestClient {
258+
private[spark] object StandaloneRestClient {
277259
val REPORT_DRIVER_STATUS_INTERVAL = 1000
278260
val REPORT_DRIVER_STATUS_MAX_TRIES = 10
279261
val PROTOCOL_VERSION = "v1"
262+
263+
/**
264+
* Submit an application, assuming parameters are specified through system properties.
265+
* Usage: StandaloneRestClient [app args*]
266+
*/
267+
def main(args: Array[String]): Unit = {
268+
val client = new StandaloneRestClient
269+
val appArgs = args.slice(1, args.size)
270+
val master = sys.props.get("spark.master").getOrElse {
271+
throw new IllegalArgumentException("'spark.master' must be set.")
272+
}
273+
val sparkProperties = new SparkConf().getAll.toMap
274+
val environmentVariables = sys.env.filter { case (k, _) => k.startsWith("SPARK_") }
275+
client.createSubmission(master, appArgs, sparkProperties, environmentVariables)
276+
}
280277
}

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,10 @@ private[spark] abstract class StandaloneRestServlet extends HttpServlet with Log
146146
val clientSideJson = parse(requestJson)
147147
val serverSideJson = parse(requestMessage.toJson)
148148
val Diff(_, _, unknown) = clientSideJson.diff(serverSideJson)
149-
unknown.asInstanceOf[JObject].obj.map { case (k, _) => k }.toArray
149+
unknown match {
150+
case j: JObject => j.obj.map { case (k, _) => k }.toArray
151+
case _ => Array.empty[String] // No difference
152+
}
150153
}
151154

152155
/** Return a human readable String representation of the exception. */
@@ -334,43 +337,30 @@ private[spark] class SubmitRequestServlet(master: Master) extends StandaloneRest
334337
* cluster mode yet.
335338
*/
336339
private def buildDriverDescription(request: CreateSubmissionRequest): DriverDescription = {
340+
val sparkProperties = request.sparkProperties
341+
337342
// Required fields, including the main class because python is not yet supported
338-
val appName = request.appName
339-
val appResource = request.appResource
340-
val mainClass = request.mainClass
341-
if (mainClass == null) {
342-
throw new SubmitRestMissingFieldException("Main class must be set in submit request.")
343+
val appResource = sparkProperties.get("spark.app.resource").getOrElse {
344+
throw new SubmitRestMissingFieldException("Main application resource is missing.")
345+
}
346+
val mainClass = sparkProperties.get("spark.app.mainClass").getOrElse {
347+
throw new SubmitRestMissingFieldException("Main class is missing.")
343348
}
344349

345350
// Optional fields
346-
val jars = Option(request.jars)
347-
val files = Option(request.files)
348-
val driverMemory = Option(request.driverMemory)
349-
val driverCores = Option(request.driverCores)
350-
val driverExtraJavaOptions = Option(request.driverExtraJavaOptions)
351-
val driverExtraClassPath = Option(request.driverExtraClassPath)
352-
val driverExtraLibraryPath = Option(request.driverExtraLibraryPath)
353-
val superviseDriver = Option(request.superviseDriver)
354-
val executorMemory = Option(request.executorMemory)
355-
val totalExecutorCores = Option(request.totalExecutorCores)
351+
val driverMemory = sparkProperties.get("spark.driver.memory")
352+
val driverCores = sparkProperties.get("spark.driver.cores")
353+
val driverExtraJavaOptions = sparkProperties.get("spark.driver.extraJavaOptions")
354+
val driverExtraClassPath = sparkProperties.get("spark.driver.extraClassPath")
355+
val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath")
356+
val superviseDriver = sparkProperties.get("spark.driver.supervise")
356357
val appArgs = request.appArgs
357-
val sparkProperties = request.sparkProperties
358358
val environmentVariables = request.environmentVariables
359359

360-
// Translate all fields to the relevant Spark properties
360+
// Construct driver description and submit it
361361
val conf = new SparkConf(false)
362362
.setAll(sparkProperties)
363363
.set("spark.master", master.masterUrl)
364-
.set("spark.app.name", appName)
365-
jars.foreach { j => conf.set("spark.jars", j) }
366-
files.foreach { f => conf.set("spark.files", f) }
367-
driverExtraJavaOptions.foreach { j => conf.set("spark.driver.extraJavaOptions", j) }
368-
driverExtraClassPath.foreach { cp => conf.set("spark.driver.extraClassPath", cp) }
369-
driverExtraLibraryPath.foreach { lp => conf.set("spark.driver.extraLibraryPath", lp) }
370-
executorMemory.foreach { m => conf.set("spark.executor.memory", m) }
371-
totalExecutorCores.foreach { c => conf.set("spark.cores.max", c) }
372-
373-
// Construct driver description and submit it
374364
val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator))
375365
val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator))
376366
val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)

core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,7 @@ private[spark] abstract class SubmitRestProtocolMessage {
8383
/** Assert that the specified field is set in this message. */
8484
protected def assertFieldIsSet(value: String, name: String): Unit = {
8585
if (value == null) {
86-
throw new SubmitRestMissingFieldException(
87-
s"Field '$name' is missing in message $messageType.")
86+
throw new SubmitRestMissingFieldException(s"'$name' is missing in message $messageType.")
8887
}
8988
}
9089

@@ -93,7 +92,7 @@ private[spark] abstract class SubmitRestProtocolMessage {
9392
if (value != null) {
9493
Try(value.toBoolean).getOrElse {
9594
throw new SubmitRestProtocolException(
96-
s"Field '$name' expected boolean value: actual was '$value'.")
95+
s"'$name' expected boolean value: actual was '$value'.")
9796
}
9897
}
9998
}
@@ -103,7 +102,7 @@ private[spark] abstract class SubmitRestProtocolMessage {
103102
if (value != null) {
104103
Try(value.toInt).getOrElse {
105104
throw new SubmitRestProtocolException(
106-
s"Field '$name' expected numeric value: actual was '$value'.")
105+
s"'$name' expected numeric value: actual was '$value'.")
107106
}
108107
}
109108
}
@@ -116,7 +115,7 @@ private[spark] abstract class SubmitRestProtocolMessage {
116115
if (value != null) {
117116
Try(Utils.memoryStringToMb(value)).getOrElse {
118117
throw new SubmitRestProtocolException(
119-
s"Field '$name' expected memory value: actual was '$value'.")
118+
s"'$name' expected memory value: actual was '$value'.")
120119
}
121120
}
122121
}

0 commit comments

Comments
 (0)