Skip to content

Commit a78c752

Browse files
committed
Merge branch 'master' of github.com:apache/spark into SPARK-23917
2 parents a4fd616 + 6a2289e commit a78c752

File tree

72 files changed

+1545
-1011
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+1545
-1011
lines changed

core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ import org.apache.hadoop.conf.Configuration
2525
import org.apache.hadoop.fs.{FileSystem, Path}
2626

2727
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
28+
import org.apache.spark.internal.Logging
2829
import org.apache.spark.util.{MutableURLClassLoader, Utils}
2930

30-
private[deploy] object DependencyUtils {
31+
private[deploy] object DependencyUtils extends Logging {
3132

3233
def resolveMavenDependencies(
3334
packagesExclusions: String,
@@ -75,7 +76,7 @@ private[deploy] object DependencyUtils {
7576
def addJarsToClassPath(jars: String, loader: MutableURLClassLoader): Unit = {
7677
if (jars != null) {
7778
for (jar <- jars.split(",")) {
78-
SparkSubmit.addJarToClasspath(jar, loader)
79+
addJarToClasspath(jar, loader)
7980
}
8081
}
8182
}
@@ -151,6 +152,31 @@ private[deploy] object DependencyUtils {
151152
}.mkString(",")
152153
}
153154

155+
def addJarToClasspath(localJar: String, loader: MutableURLClassLoader): Unit = {
156+
val uri = Utils.resolveURI(localJar)
157+
uri.getScheme match {
158+
case "file" | "local" =>
159+
val file = new File(uri.getPath)
160+
if (file.exists()) {
161+
loader.addURL(file.toURI.toURL)
162+
} else {
163+
logWarning(s"Local jar $file does not exist, skipping.")
164+
}
165+
case _ =>
166+
logWarning(s"Skip remote jar $uri.")
167+
}
168+
}
169+
170+
/**
171+
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
172+
* no files, into a single comma-separated string.
173+
*/
174+
def mergeFileLists(lists: String*): String = {
175+
val merged = lists.filterNot(StringUtils.isBlank)
176+
.flatMap(Utils.stringToSeq)
177+
if (merged.nonEmpty) merged.mkString(",") else null
178+
}
179+
154180
private def splitOnFragment(path: String): (URI, Option[String]) = {
155181
val uri = Utils.resolveURI(path)
156182
val withoutFragment = new URI(uri.getScheme, uri.getSchemeSpecificPart, null)

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 161 additions & 157 deletions
Large diffs are not rendered by default.

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 44 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
2929
import scala.io.Source
3030
import scala.util.Try
3131

32+
import org.apache.spark.{SparkException, SparkUserAppException}
3233
import org.apache.spark.deploy.SparkSubmitAction._
34+
import org.apache.spark.internal.Logging
3335
import org.apache.spark.launcher.SparkSubmitArgumentsParser
3436
import org.apache.spark.network.util.JavaUtils
3537
import org.apache.spark.util.Utils
@@ -40,7 +42,7 @@ import org.apache.spark.util.Utils
4042
* The env argument is used for testing.
4143
*/
4244
private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env)
43-
extends SparkSubmitArgumentsParser {
45+
extends SparkSubmitArgumentsParser with Logging {
4446
var master: String = null
4547
var deployMode: String = null
4648
var executorMemory: String = null
@@ -85,8 +87,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
8587
/** Default properties present in the currently defined defaults file. */
8688
lazy val defaultSparkProperties: HashMap[String, String] = {
8789
val defaultProperties = new HashMap[String, String]()
88-
// scalastyle:off println
89-
if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
90+
if (verbose) {
91+
logInfo(s"Using properties file: $propertiesFile")
92+
}
9093
Option(propertiesFile).foreach { filename =>
9194
val properties = Utils.getPropertiesFromFile(filename)
9295
properties.foreach { case (k, v) =>
@@ -95,21 +98,16 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
9598
// Property files may contain sensitive information, so redact before printing
9699
if (verbose) {
97100
Utils.redact(properties).foreach { case (k, v) =>
98-
SparkSubmit.printStream.println(s"Adding default property: $k=$v")
101+
logInfo(s"Adding default property: $k=$v")
99102
}
100103
}
101104
}
102-
// scalastyle:on println
103105
defaultProperties
104106
}
105107

106108
// Set parameters from command line arguments
107-
try {
108-
parse(args.asJava)
109-
} catch {
110-
case e: IllegalArgumentException =>
111-
SparkSubmit.printErrorAndExit(e.getMessage())
112-
}
109+
parse(args.asJava)
110+
113111
// Populate `sparkProperties` map from properties file
114112
mergeDefaultSparkProperties()
115113
// Remove keys that don't start with "spark." from `sparkProperties`.
@@ -141,7 +139,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
141139
sparkProperties.foreach { case (k, v) =>
142140
if (!k.startsWith("spark.")) {
143141
sparkProperties -= k
144-
SparkSubmit.printWarning(s"Ignoring non-spark config property: $k=$v")
142+
logWarning(s"Ignoring non-spark config property: $k=$v")
145143
}
146144
}
147145
}
@@ -215,10 +213,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
215213
}
216214
} catch {
217215
case _: Exception =>
218-
SparkSubmit.printErrorAndExit(s"Cannot load main class from JAR $primaryResource")
216+
error(s"Cannot load main class from JAR $primaryResource")
219217
}
220218
case _ =>
221-
SparkSubmit.printErrorAndExit(
219+
error(
222220
s"Cannot load main class from JAR $primaryResource with URI $uriScheme. " +
223221
"Please specify a class through --class.")
224222
}
@@ -248,6 +246,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
248246
case SUBMIT => validateSubmitArguments()
249247
case KILL => validateKillArguments()
250248
case REQUEST_STATUS => validateStatusRequestArguments()
249+
case PRINT_VERSION =>
251250
}
252251
}
253252

@@ -256,62 +255,61 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
256255
printUsageAndExit(-1)
257256
}
258257
if (primaryResource == null) {
259-
SparkSubmit.printErrorAndExit("Must specify a primary resource (JAR or Python or R file)")
258+
error("Must specify a primary resource (JAR or Python or R file)")
260259
}
261260
if (mainClass == null && SparkSubmit.isUserJar(primaryResource)) {
262-
SparkSubmit.printErrorAndExit("No main class set in JAR; please specify one with --class")
261+
error("No main class set in JAR; please specify one with --class")
263262
}
264263
if (driverMemory != null
265264
&& Try(JavaUtils.byteStringAsBytes(driverMemory)).getOrElse(-1L) <= 0) {
266-
SparkSubmit.printErrorAndExit("Driver Memory must be a positive number")
265+
error("Driver memory must be a positive number")
267266
}
268267
if (executorMemory != null
269268
&& Try(JavaUtils.byteStringAsBytes(executorMemory)).getOrElse(-1L) <= 0) {
270-
SparkSubmit.printErrorAndExit("Executor Memory cores must be a positive number")
269+
error("Executor memory must be a positive number")
271270
}
272271
if (executorCores != null && Try(executorCores.toInt).getOrElse(-1) <= 0) {
273-
SparkSubmit.printErrorAndExit("Executor cores must be a positive number")
272+
error("Executor cores must be a positive number")
274273
}
275274
if (totalExecutorCores != null && Try(totalExecutorCores.toInt).getOrElse(-1) <= 0) {
276-
SparkSubmit.printErrorAndExit("Total executor cores must be a positive number")
275+
error("Total executor cores must be a positive number")
277276
}
278277
if (numExecutors != null && Try(numExecutors.toInt).getOrElse(-1) <= 0) {
279-
SparkSubmit.printErrorAndExit("Number of executors must be a positive number")
278+
error("Number of executors must be a positive number")
280279
}
281280
if (pyFiles != null && !isPython) {
282-
SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script")
281+
error("--py-files given but primary resource is not a Python script")
283282
}
284283

285284
if (master.startsWith("yarn")) {
286285
val hasHadoopEnv = env.contains("HADOOP_CONF_DIR") || env.contains("YARN_CONF_DIR")
287286
if (!hasHadoopEnv && !Utils.isTesting) {
288-
throw new Exception(s"When running with master '$master' " +
287+
error(s"When running with master '$master' " +
289288
"either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.")
290289
}
291290
}
292291

293292
if (proxyUser != null && principal != null) {
294-
SparkSubmit.printErrorAndExit("Only one of --proxy-user or --principal can be provided.")
293+
error("Only one of --proxy-user or --principal can be provided.")
295294
}
296295
}
297296

298297
private def validateKillArguments(): Unit = {
299298
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
300-
SparkSubmit.printErrorAndExit(
301-
"Killing submissions is only supported in standalone or Mesos mode!")
299+
error("Killing submissions is only supported in standalone or Mesos mode!")
302300
}
303301
if (submissionToKill == null) {
304-
SparkSubmit.printErrorAndExit("Please specify a submission to kill.")
302+
error("Please specify a submission to kill.")
305303
}
306304
}
307305

308306
private def validateStatusRequestArguments(): Unit = {
309307
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
310-
SparkSubmit.printErrorAndExit(
308+
error(
311309
"Requesting submission statuses is only supported in standalone or Mesos mode!")
312310
}
313311
if (submissionToRequestStatusFor == null) {
314-
SparkSubmit.printErrorAndExit("Please specify a submission to request status for.")
312+
error("Please specify a submission to request status for.")
315313
}
316314
}
317315

@@ -368,7 +366,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
368366

369367
case DEPLOY_MODE =>
370368
if (value != "client" && value != "cluster") {
371-
SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
369+
error("--deploy-mode must be either \"client\" or \"cluster\"")
372370
}
373371
deployMode = value
374372

@@ -405,14 +403,14 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
405403
case KILL_SUBMISSION =>
406404
submissionToKill = value
407405
if (action != null) {
408-
SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $KILL.")
406+
error(s"Action cannot be both $action and $KILL.")
409407
}
410408
action = KILL
411409

412410
case STATUS =>
413411
submissionToRequestStatusFor = value
414412
if (action != null) {
415-
SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $REQUEST_STATUS.")
413+
error(s"Action cannot be both $action and $REQUEST_STATUS.")
416414
}
417415
action = REQUEST_STATUS
418416

@@ -444,7 +442,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
444442
repositories = value
445443

446444
case CONF =>
447-
val (confName, confValue) = SparkSubmit.parseSparkConfProperty(value)
445+
val (confName, confValue) = SparkSubmitUtils.parseSparkConfProperty(value)
448446
sparkProperties(confName) = confValue
449447

450448
case PROXY_USER =>
@@ -463,15 +461,15 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
463461
verbose = true
464462

465463
case VERSION =>
466-
SparkSubmit.printVersionAndExit()
464+
action = SparkSubmitAction.PRINT_VERSION
467465

468466
case USAGE_ERROR =>
469467
printUsageAndExit(1)
470468

471469
case _ =>
472-
throw new IllegalArgumentException(s"Unexpected argument '$opt'.")
470+
error(s"Unexpected argument '$opt'.")
473471
}
474-
true
472+
action != SparkSubmitAction.PRINT_VERSION
475473
}
476474

477475
/**
@@ -482,7 +480,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
482480
*/
483481
override protected def handleUnknown(opt: String): Boolean = {
484482
if (opt.startsWith("-")) {
485-
SparkSubmit.printErrorAndExit(s"Unrecognized option '$opt'.")
483+
error(s"Unrecognized option '$opt'.")
486484
}
487485

488486
primaryResource =
@@ -501,20 +499,18 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
501499
}
502500

503501
private def printUsageAndExit(exitCode: Int, unknownParam: Any = null): Unit = {
504-
// scalastyle:off println
505-
val outStream = SparkSubmit.printStream
506502
if (unknownParam != null) {
507-
outStream.println("Unknown/unsupported param " + unknownParam)
503+
logInfo("Unknown/unsupported param " + unknownParam)
508504
}
509505
val command = sys.env.get("_SPARK_CMD_USAGE").getOrElse(
510506
"""Usage: spark-submit [options] <app jar | python file | R file> [app arguments]
511507
|Usage: spark-submit --kill [submission ID] --master [spark://...]
512508
|Usage: spark-submit --status [submission ID] --master [spark://...]
513509
|Usage: spark-submit run-example [options] example-class [example args]""".stripMargin)
514-
outStream.println(command)
510+
logInfo(command)
515511

516512
val mem_mb = Utils.DEFAULT_DRIVER_MEM_MB
517-
outStream.println(
513+
logInfo(
518514
s"""
519515
|Options:
520516
| --master MASTER_URL spark://host:port, mesos://host:port, yarn,
@@ -596,12 +592,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
596592
)
597593

598594
if (SparkSubmit.isSqlShell(mainClass)) {
599-
outStream.println("CLI options:")
600-
outStream.println(getSqlShellOptions())
595+
logInfo("CLI options:")
596+
logInfo(getSqlShellOptions())
601597
}
602-
// scalastyle:on println
603598

604-
SparkSubmit.exitFn(exitCode)
599+
throw new SparkUserAppException(exitCode)
605600
}
606601

607602
/**
@@ -655,4 +650,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
655650
System.setErr(currentErr)
656651
}
657652
}
653+
654+
private def error(msg: String): Unit = throw new SparkException(msg)
655+
658656
}

0 commit comments

Comments
 (0)