Skip to content

Commit 466f84a

Browse files
author
Marcelo Vanzin
committed
[SPARK-22941][core] Do not exit JVM when submit fails with in-process launcher.
The current in-process launcher implementation just calls the SparkSubmit object, which, in case of errors, will more often than not exit the JVM. This is not desirable since this launcher is meant to be used inside other applications, and that would kill the application. The change turns SparkSubmit into a class, and abstracts aways some of the functionality used to print error messages and abort the submission process. The default implementation uses the logging system for messages, and throws exceptions for errors. As part of that I also moved some code that doesn't really belong in SparkSubmit to a better location. The command line invocation of spark-submit now uses a special implementation of the SparkSubmit class that overrides those behaviors to do what is expected from the command line version (print to the terminal, exit the JVM, etc). A lot of the changes are to replace calls to methods such as "printErrorAndExit" with the new API. As part of adding tests for this, I had to fix some small things in the launcher option parser so that things like "--version" can work when used in the launcher library. There is still code that prints directly to the terminal, like all the Ivy-related code in SparkSubmitUtils, and other areas where some re-factoring would help, like the CommandLineUtils class, but I chose to leave those alone to keep this change more focused. Aside from existing and added unit tests, I ran command line tools with a bunch of different arguments to make sure messages and errors behave like before.
1 parent 761565a commit 466f84a

File tree

14 files changed

+386
-280
lines changed

14 files changed

+386
-280
lines changed

core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ import org.apache.hadoop.conf.Configuration
2525
import org.apache.hadoop.fs.{FileSystem, Path}
2626

2727
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
28+
import org.apache.spark.internal.Logging
2829
import org.apache.spark.util.{MutableURLClassLoader, Utils}
2930

30-
private[deploy] object DependencyUtils {
31+
private[deploy] object DependencyUtils extends Logging {
3132

3233
def resolveMavenDependencies(
3334
packagesExclusions: String,
@@ -72,7 +73,7 @@ private[deploy] object DependencyUtils {
7273
def addJarsToClassPath(jars: String, loader: MutableURLClassLoader): Unit = {
7374
if (jars != null) {
7475
for (jar <- jars.split(",")) {
75-
SparkSubmit.addJarToClasspath(jar, loader)
76+
addJarToClasspath(jar, loader)
7677
}
7778
}
7879
}
@@ -148,6 +149,31 @@ private[deploy] object DependencyUtils {
148149
}.mkString(",")
149150
}
150151

152+
def addJarToClasspath(localJar: String, loader: MutableURLClassLoader): Unit = {
153+
val uri = Utils.resolveURI(localJar)
154+
uri.getScheme match {
155+
case "file" | "local" =>
156+
val file = new File(uri.getPath)
157+
if (file.exists()) {
158+
loader.addURL(file.toURI.toURL)
159+
} else {
160+
logWarning(s"Local jar $file does not exist, skipping.")
161+
}
162+
case _ =>
163+
logWarning(s"Skip remote jar $uri.")
164+
}
165+
}
166+
167+
/**
168+
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
169+
* no files, into a single comma-separated string.
170+
*/
171+
def mergeFileLists(lists: String*): String = {
172+
val merged = lists.filterNot(StringUtils.isBlank)
173+
.flatMap(Utils.stringToSeq)
174+
if (merged.nonEmpty) merged.mkString(",") else null
175+
}
176+
151177
private def splitOnFragment(path: String): (URI, Option[String]) = {
152178
val uri = Utils.resolveURI(path)
153179
val withoutFragment = new URI(uri.getScheme, uri.getSchemeSpecificPart, null)

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 161 additions & 157 deletions
Large diffs are not rendered by default.

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 43 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
2929
import scala.io.Source
3030
import scala.util.Try
3131

32+
import org.apache.spark.{SparkException, SparkUserAppException}
3233
import org.apache.spark.deploy.SparkSubmitAction._
34+
import org.apache.spark.internal.Logging
3335
import org.apache.spark.launcher.SparkSubmitArgumentsParser
3436
import org.apache.spark.network.util.JavaUtils
3537
import org.apache.spark.util.Utils
@@ -40,7 +42,7 @@ import org.apache.spark.util.Utils
4042
* The env argument is used for testing.
4143
*/
4244
private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env)
43-
extends SparkSubmitArgumentsParser {
45+
extends SparkSubmitArgumentsParser with Logging {
4446
var master: String = null
4547
var deployMode: String = null
4648
var executorMemory: String = null
@@ -84,8 +86,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
8486
/** Default properties present in the currently defined defaults file. */
8587
lazy val defaultSparkProperties: HashMap[String, String] = {
8688
val defaultProperties = new HashMap[String, String]()
87-
// scalastyle:off println
88-
if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
89+
if (verbose) {
90+
logInfo(s"Using properties file: $propertiesFile")
91+
}
8992
Option(propertiesFile).foreach { filename =>
9093
val properties = Utils.getPropertiesFromFile(filename)
9194
properties.foreach { case (k, v) =>
@@ -94,21 +97,16 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
9497
// Property files may contain sensitive information, so redact before printing
9598
if (verbose) {
9699
Utils.redact(properties).foreach { case (k, v) =>
97-
SparkSubmit.printStream.println(s"Adding default property: $k=$v")
100+
logInfo(s"Adding default property: $k=$v")
98101
}
99102
}
100103
}
101-
// scalastyle:on println
102104
defaultProperties
103105
}
104106

105107
// Set parameters from command line arguments
106-
try {
107-
parse(args.asJava)
108-
} catch {
109-
case e: IllegalArgumentException =>
110-
SparkSubmit.printErrorAndExit(e.getMessage())
111-
}
108+
parse(args.asJava)
109+
112110
// Populate `sparkProperties` map from properties file
113111
mergeDefaultSparkProperties()
114112
// Remove keys that don't start with "spark." from `sparkProperties`.
@@ -140,7 +138,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
140138
sparkProperties.foreach { case (k, v) =>
141139
if (!k.startsWith("spark.")) {
142140
sparkProperties -= k
143-
SparkSubmit.printWarning(s"Ignoring non-spark config property: $k=$v")
141+
logWarning(s"Ignoring non-spark config property: $k=$v")
144142
}
145143
}
146144
}
@@ -213,10 +211,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
213211
}
214212
} catch {
215213
case _: Exception =>
216-
SparkSubmit.printErrorAndExit(s"Cannot load main class from JAR $primaryResource")
214+
error(s"Cannot load main class from JAR $primaryResource")
217215
}
218216
case _ =>
219-
SparkSubmit.printErrorAndExit(
217+
error(
220218
s"Cannot load main class from JAR $primaryResource with URI $uriScheme. " +
221219
"Please specify a class through --class.")
222220
}
@@ -246,6 +244,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
246244
case SUBMIT => validateSubmitArguments()
247245
case KILL => validateKillArguments()
248246
case REQUEST_STATUS => validateStatusRequestArguments()
247+
case PRINT_VERSION =>
249248
}
250249
}
251250

@@ -254,30 +253,30 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
254253
printUsageAndExit(-1)
255254
}
256255
if (primaryResource == null) {
257-
SparkSubmit.printErrorAndExit("Must specify a primary resource (JAR or Python or R file)")
256+
error("Must specify a primary resource (JAR or Python or R file)")
258257
}
259258
if (mainClass == null && SparkSubmit.isUserJar(primaryResource)) {
260-
SparkSubmit.printErrorAndExit("No main class set in JAR; please specify one with --class")
259+
error("No main class set in JAR; please specify one with --class")
261260
}
262261
if (driverMemory != null
263262
&& Try(JavaUtils.byteStringAsBytes(driverMemory)).getOrElse(-1L) <= 0) {
264-
SparkSubmit.printErrorAndExit("Driver Memory must be a positive number")
263+
error("Driver Memory must be a positive number")
265264
}
266265
if (executorMemory != null
267266
&& Try(JavaUtils.byteStringAsBytes(executorMemory)).getOrElse(-1L) <= 0) {
268-
SparkSubmit.printErrorAndExit("Executor Memory cores must be a positive number")
267+
error("Executor Memory cores must be a positive number")
269268
}
270269
if (executorCores != null && Try(executorCores.toInt).getOrElse(-1) <= 0) {
271-
SparkSubmit.printErrorAndExit("Executor cores must be a positive number")
270+
error("Executor cores must be a positive number")
272271
}
273272
if (totalExecutorCores != null && Try(totalExecutorCores.toInt).getOrElse(-1) <= 0) {
274-
SparkSubmit.printErrorAndExit("Total executor cores must be a positive number")
273+
error("Total executor cores must be a positive number")
275274
}
276275
if (numExecutors != null && Try(numExecutors.toInt).getOrElse(-1) <= 0) {
277-
SparkSubmit.printErrorAndExit("Number of executors must be a positive number")
276+
error("Number of executors must be a positive number")
278277
}
279278
if (pyFiles != null && !isPython) {
280-
SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script")
279+
error("--py-files given but primary resource is not a Python script")
281280
}
282281

283282
if (master.startsWith("yarn")) {
@@ -289,27 +288,26 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
289288
}
290289

291290
if (proxyUser != null && principal != null) {
292-
SparkSubmit.printErrorAndExit("Only one of --proxy-user or --principal can be provided.")
291+
error("Only one of --proxy-user or --principal can be provided.")
293292
}
294293
}
295294

296295
private def validateKillArguments(): Unit = {
297296
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
298-
SparkSubmit.printErrorAndExit(
299-
"Killing submissions is only supported in standalone or Mesos mode!")
297+
error("Killing submissions is only supported in standalone or Mesos mode!")
300298
}
301299
if (submissionToKill == null) {
302-
SparkSubmit.printErrorAndExit("Please specify a submission to kill.")
300+
error("Please specify a submission to kill.")
303301
}
304302
}
305303

306304
private def validateStatusRequestArguments(): Unit = {
307305
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
308-
SparkSubmit.printErrorAndExit(
306+
error(
309307
"Requesting submission statuses is only supported in standalone or Mesos mode!")
310308
}
311309
if (submissionToRequestStatusFor == null) {
312-
SparkSubmit.printErrorAndExit("Please specify a submission to request status for.")
310+
error("Please specify a submission to request status for.")
313311
}
314312
}
315313

@@ -366,7 +364,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
366364

367365
case DEPLOY_MODE =>
368366
if (value != "client" && value != "cluster") {
369-
SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
367+
error("--deploy-mode must be either \"client\" or \"cluster\"")
370368
}
371369
deployMode = value
372370

@@ -403,14 +401,14 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
403401
case KILL_SUBMISSION =>
404402
submissionToKill = value
405403
if (action != null) {
406-
SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $KILL.")
404+
error(s"Action cannot be both $action and $KILL.")
407405
}
408406
action = KILL
409407

410408
case STATUS =>
411409
submissionToRequestStatusFor = value
412410
if (action != null) {
413-
SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $REQUEST_STATUS.")
411+
error(s"Action cannot be both $action and $REQUEST_STATUS.")
414412
}
415413
action = REQUEST_STATUS
416414

@@ -442,7 +440,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
442440
repositories = value
443441

444442
case CONF =>
445-
val (confName, confValue) = SparkSubmit.parseSparkConfProperty(value)
443+
val (confName, confValue) = SparkSubmitUtils.parseSparkConfProperty(value)
446444
sparkProperties(confName) = confValue
447445

448446
case PROXY_USER =>
@@ -461,15 +459,15 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
461459
verbose = true
462460

463461
case VERSION =>
464-
SparkSubmit.printVersionAndExit()
462+
action = SparkSubmitAction.PRINT_VERSION
465463

466464
case USAGE_ERROR =>
467465
printUsageAndExit(1)
468466

469467
case _ =>
470-
throw new IllegalArgumentException(s"Unexpected argument '$opt'.")
468+
error(s"Unexpected argument '$opt'.")
471469
}
472-
true
470+
action != SparkSubmitAction.PRINT_VERSION
473471
}
474472

475473
/**
@@ -480,7 +478,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
480478
*/
481479
override protected def handleUnknown(opt: String): Boolean = {
482480
if (opt.startsWith("-")) {
483-
SparkSubmit.printErrorAndExit(s"Unrecognized option '$opt'.")
481+
error(s"Unrecognized option '$opt'.")
484482
}
485483

486484
primaryResource =
@@ -499,20 +497,18 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
499497
}
500498

501499
private def printUsageAndExit(exitCode: Int, unknownParam: Any = null): Unit = {
502-
// scalastyle:off println
503-
val outStream = SparkSubmit.printStream
504500
if (unknownParam != null) {
505-
outStream.println("Unknown/unsupported param " + unknownParam)
501+
logInfo("Unknown/unsupported param " + unknownParam)
506502
}
507503
val command = sys.env.get("_SPARK_CMD_USAGE").getOrElse(
508504
"""Usage: spark-submit [options] <app jar | python file | R file> [app arguments]
509505
|Usage: spark-submit --kill [submission ID] --master [spark://...]
510506
|Usage: spark-submit --status [submission ID] --master [spark://...]
511507
|Usage: spark-submit run-example [options] example-class [example args]""".stripMargin)
512-
outStream.println(command)
508+
logInfo(command)
513509

514510
val mem_mb = Utils.DEFAULT_DRIVER_MEM_MB
515-
outStream.println(
511+
logInfo(
516512
s"""
517513
|Options:
518514
| --master MASTER_URL spark://host:port, mesos://host:port, yarn,
@@ -594,12 +590,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
594590
)
595591

596592
if (SparkSubmit.isSqlShell(mainClass)) {
597-
outStream.println("CLI options:")
598-
outStream.println(getSqlShellOptions())
593+
logInfo("CLI options:")
594+
logInfo(getSqlShellOptions())
599595
}
600-
// scalastyle:on println
601596

602-
SparkSubmit.exitFn(exitCode)
597+
throw new SparkUserAppException(exitCode)
603598
}
604599

605600
/**
@@ -653,4 +648,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
653648
System.setErr(currentErr)
654649
}
655650
}
651+
652+
private def error(msg: String): Unit = throw new SparkException(msg)
653+
656654
}

core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
2525
import org.apache.spark.deploy.{DependencyUtils, SparkHadoopUtil, SparkSubmit}
2626
import org.apache.spark.internal.Logging
2727
import org.apache.spark.rpc.RpcEnv
28-
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
28+
import org.apache.spark.util._
2929

3030
/**
3131
* Utility object for launching driver programs such that they share fate with the Worker process.
@@ -88,7 +88,7 @@ object DriverWrapper extends Logging {
8888
val jars = {
8989
val jarsProp = sys.props.get("spark.jars").orNull
9090
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
91-
SparkSubmit.mergeFileLists(jarsProp, resolvedMavenCoordinates)
91+
DependencyUtils.mergeFileLists(jarsProp, resolvedMavenCoordinates)
9292
} else {
9393
jarsProp
9494
}

core/src/main/scala/org/apache/spark/util/CommandLineUtils.scala

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,24 +33,14 @@ private[spark] trait CommandLineUtils {
3333
private[spark] var printStream: PrintStream = System.err
3434

3535
// scalastyle:off println
36-
37-
private[spark] def printWarning(str: String): Unit = printStream.println("Warning: " + str)
36+
private[spark] def printMessage(str: String): Unit = printStream.println(str)
37+
// scalastyle:on println
3838

3939
private[spark] def printErrorAndExit(str: String): Unit = {
40-
printStream.println("Error: " + str)
41-
printStream.println("Run with --help for usage help or --verbose for debug output")
40+
printMessage("Error: " + str)
41+
printMessage("Run with --help for usage help or --verbose for debug output")
4242
exitFn(1)
4343
}
4444

45-
// scalastyle:on println
46-
47-
private[spark] def parseSparkConfProperty(pair: String): (String, String) = {
48-
pair.split("=", 2).toSeq match {
49-
case Seq(k, v) => (k, v)
50-
case _ => printErrorAndExit(s"Spark config without '=': $pair")
51-
throw new SparkException(s"Spark config without '=': $pair")
52-
}
53-
}
54-
5545
def main(args: Array[String]): Unit
5646
}

0 commit comments

Comments
 (0)