Skip to content

Commit 25c5ae6

Browse files
author
Marcelo Vanzin
committed
Centralize SparkSubmit command line parsing.
Use a common base class to parse SparkSubmit command line arguments. This forces anyone who wants to add new arguments to modify the shared parser, updating all code that needs to know about SparkSubmit options in the process. Also create some constants to avoid copy & pasting strings around to actually process the options.
1 parent 27be98a commit 25c5ae6

File tree

5 files changed

+473
-184
lines changed

5 files changed

+473
-184
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 64 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,21 @@
1818
package org.apache.spark.deploy
1919

2020
import java.net.URI
21+
import java.util.{List => JList}
2122
import java.util.jar.JarFile
2223

24+
import scala.collection.JavaConversions._
2325
import scala.collection.mutable.{ArrayBuffer, HashMap}
2426

27+
import org.apache.spark.launcher.SparkSubmitOptionParser
2528
import org.apache.spark.util.Utils
2629

2730
/**
2831
* Parses and encapsulates arguments from the spark-submit script.
2932
* The env argument is used for testing.
3033
*/
31-
private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env) {
34+
private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env)
35+
extends SparkSubmitOptionParser {
3236
var master: String = null
3337
var deployMode: String = null
3438
var executorMemory: String = null
@@ -73,7 +77,12 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
7377
}
7478

7579
// Set parameters from command line arguments
76-
parseOpts(args.toList)
80+
try {
81+
parse(args.toList)
82+
} catch {
83+
case e: IllegalArgumentException =>
84+
SparkSubmit.printErrorAndExit(e.getMessage())
85+
}
7786
// Populate `sparkProperties` map from properties file
7887
mergeDefaultSparkProperties()
7988
// Use `sparkProperties` map along with env vars to fill in any missing parameters
@@ -224,138 +233,116 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
224233
""".stripMargin
225234
}
226235

227-
/**
228-
* Fill in values by parsing user options.
229-
* NOTE: Any changes here must be reflected in YarnClientSchedulerBackend.
230-
*/
231-
private def parseOpts(opts: Seq[String]): Unit = {
232-
val EQ_SEPARATED_OPT="""(--[^=]+)=(.+)""".r
233-
234-
// Delineates parsing of Spark options from parsing of user options.
235-
parse(opts)
236-
237-
/**
238-
* NOTE: If you add or remove spark-submit options,
239-
* modify NOT ONLY this file but also utils.sh
240-
*/
241-
def parse(opts: Seq[String]): Unit = opts match {
242-
case ("--name") :: value :: tail =>
236+
import SparkSubmitOptionParser._
237+
238+
/** Fill in values by parsing user options. */
239+
override protected def handle(opt: String, value: String): Boolean = {
240+
opt match {
241+
case NAME =>
243242
name = value
244-
parse(tail)
245243

246-
case ("--master") :: value :: tail =>
244+
case MASTER =>
247245
master = value
248-
parse(tail)
249246

250-
case ("--class") :: value :: tail =>
247+
case CLASS =>
251248
mainClass = value
252-
parse(tail)
253249

254-
case ("--deploy-mode") :: value :: tail =>
250+
case DEPLOY_MODE =>
255251
if (value != "client" && value != "cluster") {
256252
SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
257253
}
258254
deployMode = value
259-
parse(tail)
260255

261-
case ("--num-executors") :: value :: tail =>
256+
case NUM_EXECUTORS =>
262257
numExecutors = value
263-
parse(tail)
264258

265-
case ("--total-executor-cores") :: value :: tail =>
259+
case TOTAL_EXECUTOR_CORES =>
266260
totalExecutorCores = value
267-
parse(tail)
268261

269-
case ("--executor-cores") :: value :: tail =>
262+
case EXECUTOR_CORES =>
270263
executorCores = value
271-
parse(tail)
272264

273-
case ("--executor-memory") :: value :: tail =>
265+
case EXECUTOR_MEMORY =>
274266
executorMemory = value
275-
parse(tail)
276267

277-
case ("--driver-memory") :: value :: tail =>
268+
case DRIVER_MEMORY =>
278269
driverMemory = value
279-
parse(tail)
280270

281-
case ("--driver-cores") :: value :: tail =>
271+
case DRIVER_CORES =>
282272
driverCores = value
283-
parse(tail)
284273

285-
case ("--driver-class-path") :: value :: tail =>
274+
case DRIVER_CLASS_PATH =>
286275
driverExtraClassPath = value
287-
parse(tail)
288276

289-
case ("--driver-java-options") :: value :: tail =>
277+
case DRIVER_JAVA_OPTIONS =>
290278
driverExtraJavaOptions = value
291-
parse(tail)
292279

293-
case ("--driver-library-path") :: value :: tail =>
280+
case DRIVER_LIBRARY_PATH =>
294281
driverExtraLibraryPath = value
295-
parse(tail)
296282

297-
case ("--properties-file") :: value :: tail =>
283+
case PROPERTIES_FILE =>
298284
propertiesFile = value
299-
parse(tail)
300285

301-
case ("--supervise") :: tail =>
286+
case SUPERVISE =>
302287
supervise = true
303-
parse(tail)
304288

305-
case ("--queue") :: value :: tail =>
289+
case QUEUE =>
306290
queue = value
307-
parse(tail)
308291

309-
case ("--files") :: value :: tail =>
292+
case FILES =>
310293
files = Utils.resolveURIs(value)
311-
parse(tail)
312294

313-
case ("--py-files") :: value :: tail =>
295+
case PY_FILES =>
314296
pyFiles = Utils.resolveURIs(value)
315-
parse(tail)
316297

317-
case ("--archives") :: value :: tail =>
298+
case ARCHIVES =>
318299
archives = Utils.resolveURIs(value)
319-
parse(tail)
320300

321-
case ("--jars") :: value :: tail =>
301+
case JARS =>
322302
jars = Utils.resolveURIs(value)
323-
parse(tail)
324303

325-
case ("--conf" | "-c") :: value :: tail =>
304+
case CONF =>
326305
value.split("=", 2).toSeq match {
327306
case Seq(k, v) => sparkProperties(k) = v
328307
case _ => SparkSubmit.printErrorAndExit(s"Spark config without '=': $value")
329308
}
330-
parse(tail)
331309

332-
case ("--help" | "-h") :: tail =>
310+
case HELP =>
333311
printUsageAndExit(0)
334312

335-
case ("--verbose" | "-v") :: tail =>
313+
case VERBOSE =>
336314
verbose = true
337-
parse(tail)
338315

339-
case EQ_SEPARATED_OPT(opt, value) :: tail =>
340-
parse(opt :: value :: tail)
316+
case _ =>
317+
throw new IllegalArgumentException(s"Unexpected argument '$opt'.")
318+
}
319+
true
320+
}
341321

342-
case value :: tail if value.startsWith("-") =>
343-
SparkSubmit.printErrorAndExit(s"Unrecognized option '$value'.")
322+
/**
323+
* The first unrecognized option is treated as the "primary resource". Everything else is
324+
* treated as application arguments.
325+
*/
326+
override protected def handleUnknown(opt: String): Boolean = {
327+
if (opt.startsWith("-")) {
328+
SparkSubmit.printErrorAndExit(s"Unrecognized option '$opt'.")
329+
}
344330

345-
case value :: tail =>
346-
primaryResource =
347-
if (!SparkSubmit.isShell(value) && !SparkSubmit.isInternal(value)) {
348-
Utils.resolveURI(value).toString
349-
} else {
350-
value
351-
}
352-
isPython = SparkSubmit.isPython(value)
353-
childArgs ++= tail
331+
primaryResource =
332+
if (!SparkSubmit.isShell(opt) && !SparkSubmit.isInternal(opt)) {
333+
Utils.resolveURI(opt).toString
334+
} else {
335+
opt
336+
}
337+
isPython = SparkSubmit.isPython(opt)
338+
false
339+
}
354340

355-
case Nil =>
356-
}
341+
override protected def handleExtraArgs(extra: JList[String]): Unit = {
342+
childArgs ++= extra
357343
}
358344

345+
359346
private def printUsageAndExit(exitCode: Int, unknownParam: Any = null): Unit = {
360347
val outStream = SparkSubmit.printStream
361348
if (unknownParam != null) {

launcher/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@
4242
<artifactId>log4j</artifactId>
4343
<scope>test</scope>
4444
</dependency>
45+
<dependency>
46+
<groupId>org.mockito</groupId>
47+
<artifactId>mockito-all</artifactId>
48+
<scope>test</scope>
49+
</dependency>
4550
<dependency>
4651
<groupId>org.scalatest</groupId>
4752
<artifactId>scalatest_${scala.binary.version}</artifactId>

0 commit comments

Comments
 (0)