17
17
18
18
package org .apache .spark .deploy
19
19
20
- import scala .collection .mutable .ArrayBuffer
21
- import java .io .File
20
+ import java .io .{File , FileInputStream , IOException }
21
+ import java .util .Properties
22
+
23
+ import scala .collection .JavaConversions ._
24
+ import scala .collection .mutable .{HashMap , ArrayBuffer }
25
+
26
+ import org .apache .spark .SparkException
22
27
23
28
/**
24
29
* Parses and encapsulates arguments from the spark-submit script.
25
30
*/
26
31
private [spark] class SparkSubmitArguments (args : Array [String ]) {
27
- var master : String = " local "
32
+ var master : String = null
28
33
var deployMode : String = null
29
34
var executorMemory : String = null
30
35
var executorCores : String = null
@@ -47,22 +52,70 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
47
52
var jars : String = null
48
53
var verbose : Boolean = false
49
54
50
- loadEnvVars()
51
55
parseOpts(args.toList)
56
+ loadDefaults()
57
+ checkRequiredArguments()
58
+
59
+ /** Return default present in the currently defined defaults file. */
60
+ def getDefaultSparkProperties = {
61
+ val defaultProperties = new HashMap [String , String ]()
62
+ if (verbose) SparkSubmit .printStream.println(s " Using properties file: $propertiesFile" )
63
+ Option (propertiesFile).foreach { filename =>
64
+ val file = new File (filename)
65
+ SparkSubmitArguments .getPropertiesFromFile(file).foreach { case (k, v) =>
66
+ if (k.startsWith(" spark" )) {
67
+ defaultProperties(k) = v
68
+ if (verbose) SparkSubmit .printStream.println(s " Adding default property: $k= $v" )
69
+ }
70
+ else {
71
+ SparkSubmit .printWarning(s " Ignoring non-spark config property: $k= $v" )
72
+ }
73
+ }
74
+ }
75
+ defaultProperties
76
+ }
52
77
53
- // Sanity checks
54
- if (args.length == 0 ) printUsageAndExit(- 1 )
55
- if (primaryResource == null ) SparkSubmit .printErrorAndExit(" Must specify a primary resource" )
56
- if (mainClass == null ) SparkSubmit .printErrorAndExit(" Must specify a main class with --class" )
57
- if (propertiesFile == null ) {
58
- sys.env.get(" SPARK_HOME" ).foreach { sparkHome =>
59
- val sep = File .separator
60
- val defaultPath = s " ${sparkHome}${sep}conf ${sep}spark-defaults.properties "
61
- val file = new File (defaultPath)
62
- if (file.exists()) {
63
- propertiesFile = file.getAbsolutePath
64
- }
78
+ /** Fill in any undefined values based on the current properties file or built-in defaults. */
79
+ private def loadDefaults () = {
80
+
81
+ // Use common defaults file, if not specified by user
82
+ if (propertiesFile == null ) {
83
+ sys.env.get(" SPARK_HOME" ).foreach { sparkHome =>
84
+ val sep = File .separator
85
+ val defaultPath = s " ${sparkHome}${sep}conf ${sep}spark-defaults.properties "
86
+ val file = new File (defaultPath)
87
+ if (file.exists()) {
88
+ propertiesFile = file.getAbsolutePath
89
+ }
90
+ }
65
91
}
92
+
93
+ val defaultProperties = getDefaultSparkProperties
94
+ // Use properties file as fallback for values which have a direct analog to
95
+ // arguments in this script.
96
+ master = Option (master).getOrElse(defaultProperties.get(" spark.master" ).orNull)
97
+ executorMemory = Option (executorMemory)
98
+ .getOrElse(defaultProperties.get(" spark.executor.memory" ).orNull)
99
+ executorCores = Option (executorCores)
100
+ .getOrElse(defaultProperties.get(" spark.executor.cores" ).orNull)
101
+ totalExecutorCores = Option (totalExecutorCores)
102
+ .getOrElse(defaultProperties.get(" spark.cores.max" ).orNull)
103
+ name = Option (name).getOrElse(defaultProperties.get(" spark.app.name" ).orNull)
104
+ jars = Option (jars).getOrElse(defaultProperties.get(" spark.jars" ).orNull)
105
+
106
+ // This supports env vars in older versions of Spark
107
+ master = Option (master).getOrElse(System .getenv(" MASTER" ))
108
+ deployMode = Option (deployMode).getOrElse(System .getenv(" DEPLOY_MODE" ))
109
+
110
+ // Global defaults. These should be keep to minimum to avoid confusing behavior.
111
+ master = Option (master).getOrElse(" local" )
112
+ }
113
+
114
+ /** Ensure that required fields exists. Call this only once all defaults are loaded. */
115
+ private def checkRequiredArguments () = {
116
+ if (args.length == 0 ) printUsageAndExit(- 1 )
117
+ if (primaryResource == null ) SparkSubmit .printErrorAndExit(" Must specify a primary resource" )
118
+ if (mainClass == null ) SparkSubmit .printErrorAndExit(" Must specify a main class with --class" )
66
119
}
67
120
68
121
override def toString = {
@@ -89,14 +142,12 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
89
142
| childArgs [ ${childArgs.mkString(" " )}]
90
143
| jars $jars
91
144
| verbose $verbose
145
+ |
146
+ |Default properties from $propertiesFile:
147
+ | ${getDefaultSparkProperties.mkString(" " , " \n " , " \n " )}
92
148
""" .stripMargin
93
149
}
94
150
95
- private def loadEnvVars () {
96
- Option (System .getenv(" MASTER" )).map(master = _)
97
- Option (System .getenv(" DEPLOY_MODE" )).map(deployMode = _)
98
- }
99
-
100
151
private def parseOpts (opts : List [String ]): Unit = opts match {
101
152
case (" --name" ) :: value :: tail =>
102
153
name = value
@@ -189,6 +240,18 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
189
240
parseOpts(tail)
190
241
191
242
case value :: tail =>
243
+ if (value.startsWith(" -" )) {
244
+ val errMessage = s " Unrecognized option ' $value'. "
245
+ val suggestion : Option [String ] = value match {
246
+ case v if v.startsWith(" --" ) && v.contains(" =" ) =>
247
+ val parts = v.split(" =" )
248
+ Some (s " Perhaps you meant ' ${parts(0 )} ${parts(1 )}'? " )
249
+ case _ =>
250
+ None
251
+ }
252
+ SparkSubmit .printErrorAndExit(errMessage + suggestion.map(" " + _).getOrElse(" " ))
253
+ }
254
+
192
255
if (primaryResource != null ) {
193
256
val error = s " Found two conflicting resources, $value and $primaryResource. " +
194
257
" Expecting only one resource."
@@ -217,6 +280,8 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
217
280
| --jars JARS A comma-separated list of local jars to include on the
218
281
| driver classpath and that SparkContext.addJar will work
219
282
| with. Doesn't work on standalone with 'cluster' deploy mode.
283
+ | --files FILES Comma separated list of files to be placed in the working dir
284
+ | of each executor.
220
285
| --properties-file FILE Path to a file from which to load extra properties. If not
221
286
| specified, this will look for conf/spark-defaults.properties.
222
287
|
@@ -225,6 +290,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
225
290
| --driver-library-path Extra library path entries to pass to the driver
226
291
| --driver-class-path Extra class path entries to pass to the driver
227
292
|
293
+ | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
228
294
|
229
295
| Spark standalone with cluster deploy mode only:
230
296
| --driver-cores NUM Cores for driver (Default: 1).
@@ -235,14 +301,28 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
235
301
|
236
302
| YARN-only:
237
303
| --executor-cores NUM Number of cores per executor (Default: 1).
238
- | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
239
304
| --queue QUEUE_NAME The YARN queue to submit to (Default: 'default').
240
305
| --num-executors NUM Number of executors to (Default: 2).
241
- | --files FILES Comma separated list of files to be placed in the working dir
242
- | of each executor.
243
306
| --archives ARCHIVES Comma separated list of archives to be extracted into the
244
307
| working dir of each executor.""" .stripMargin
245
308
)
246
309
SparkSubmit .exitFn()
247
310
}
248
311
}
312
+
313
+ object SparkSubmitArguments {
314
+ /** Load properties present in the given file. */
315
+ def getPropertiesFromFile (file : File ): Seq [(String , String )] = {
316
+ require(file.exists(), s " Properties file ${file.getName} does not exist " )
317
+ val inputStream = new FileInputStream (file)
318
+ val properties = new Properties ()
319
+ try {
320
+ properties.load(inputStream)
321
+ } catch {
322
+ case e : IOException =>
323
+ val message = s " Failed when loading Spark properties file ${file.getName}"
324
+ throw new SparkException (message, e)
325
+ }
326
+ properties.stringPropertyNames().toSeq.map(k => (k, properties(k)))
327
+ }
328
+ }
0 commit comments