-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-5388] Provide a stable application submission gateway for standalone cluster mode #4216
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
53e7c0e
af9d9cb
6ff088d
484bd21
e958cae
544de1d
120ab9d
b44e103
51c5ca6
9e21b72
63c05b3
206cae4
77774ba
6568ca5
d8d3717
837475b
e42c131
efa5e18
d7a1f9f
df90e8b
8d43486
3db7379
e2104e6
914fdff
9581df7
e2f7f5f
7ee6737
bf696ff
6c57b4b
b2fef8b
ade28fd
9229433
1f1c03f
42e5de4
9e0d1af
581f7bf
721819f
f98660b
792e112
bbbd329
c643f64
252d53c
9165ae8
37538e0
8188e61
09f873a
9fee16f
cbd670b
40e6095
6fc7670
c9a8ad7
b4695e7
9c82a36
d2b1ef8
02b5cea
b9e2a08
dfe4bd7
6f0c597
8d7ce07
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,25 +18,35 @@ | |
package org.apache.spark.deploy | ||
|
||
import java.io.{File, PrintStream} | ||
import java.lang.reflect.{Modifier, InvocationTargetException} | ||
import java.lang.reflect.{InvocationTargetException, Modifier} | ||
import java.net.URL | ||
|
||
import scala.collection.mutable.{ArrayBuffer, HashMap, Map} | ||
|
||
import org.apache.hadoop.fs.Path | ||
import org.apache.ivy.Ivy | ||
import org.apache.ivy.core.LogOptions | ||
import org.apache.ivy.core.module.descriptor.{DefaultExcludeRule, DefaultDependencyDescriptor, DefaultModuleDescriptor} | ||
import org.apache.ivy.core.module.id.{ModuleId, ArtifactId, ModuleRevisionId} | ||
import org.apache.ivy.core.module.descriptor._ | ||
import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId} | ||
import org.apache.ivy.core.report.ResolveReport | ||
import org.apache.ivy.core.resolve.{IvyNode, ResolveOptions} | ||
import org.apache.ivy.core.resolve.ResolveOptions | ||
import org.apache.ivy.core.retrieve.RetrieveOptions | ||
import org.apache.ivy.core.settings.IvySettings | ||
import org.apache.ivy.plugins.matcher.GlobPatternMatcher | ||
import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver} | ||
import org.apache.spark.executor.ExecutorURLClassLoader | ||
|
||
import org.apache.spark.deploy.rest._ | ||
import org.apache.spark.executor._ | ||
import org.apache.spark.util.Utils | ||
import org.apache.spark.executor.ChildExecutorURLClassLoader | ||
import org.apache.spark.executor.MutableURLClassLoader | ||
|
||
/** | ||
* Whether to submit, kill, or request the status of an application. | ||
* The latter two operations are currently supported only for standalone cluster mode. | ||
*/ | ||
private[spark] object SparkSubmitAction extends Enumeration { | ||
type SparkSubmitAction = Value | ||
val SUBMIT, KILL, REQUEST_STATUS = Value | ||
} | ||
|
||
/** | ||
* Main gateway of launching a Spark application. | ||
|
@@ -83,21 +93,74 @@ object SparkSubmit { | |
if (appArgs.verbose) { | ||
printStream.println(appArgs) | ||
} | ||
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) | ||
launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose) | ||
appArgs.action match { | ||
case SparkSubmitAction.SUBMIT => submit(appArgs) | ||
case SparkSubmitAction.KILL => kill(appArgs) | ||
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) | ||
} | ||
} | ||
|
||
/** Kill an existing submission using the REST protocol. Standalone cluster mode only. */ | ||
private def kill(args: SparkSubmitArguments): Unit = { | ||
new StandaloneRestClient() | ||
.killSubmission(args.master, args.submissionToKill) | ||
} | ||
|
||
/** | ||
* @return a tuple containing | ||
* (1) the arguments for the child process, | ||
* (2) a list of classpath entries for the child, | ||
* (3) a list of system properties and env vars, and | ||
* (4) the main class for the child | ||
* Request the status of an existing submission using the REST protocol. | ||
* Standalone cluster mode only. | ||
*/ | ||
private[spark] def createLaunchEnv(args: SparkSubmitArguments) | ||
: (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { | ||
private def requestStatus(args: SparkSubmitArguments): Unit = { | ||
new StandaloneRestClient() | ||
.requestSubmissionStatus(args.master, args.submissionToRequestStatusFor) | ||
} | ||
|
||
// Values to return | ||
/** | ||
* Submit the application using the provided parameters. | ||
* | ||
* This runs in two steps. First, we prepare the launch environment by setting up | ||
* the appropriate classpath, system properties, and application arguments for | ||
* running the child main class based on the cluster manager and the deploy mode. | ||
* Second, we use this launch environment to invoke the main method of the child | ||
* main class. | ||
*/ | ||
private[spark] def submit(args: SparkSubmitArguments): Unit = { | ||
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) | ||
// In standalone cluster mode, there are two submission gateways: | ||
// (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper | ||
// (2) The new REST-based gateway introduced in Spark 1.3 | ||
// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over | ||
// to use the legacy gateway if the master endpoint turns out to be not a REST server. | ||
if (args.isStandaloneCluster && args.useRest) { | ||
try { | ||
printStream.println("Running Spark using the REST application submission protocol.") | ||
runMain(childArgs, childClasspath, sysProps, childMainClass) | ||
} catch { | ||
// Fail over to use the legacy submission gateway | ||
case e: SubmitRestConnectionException => | ||
printWarning(s"Master endpoint ${args.master} was not a REST server. " + | ||
"Falling back to legacy submission gateway instead.") | ||
args.useRest = false | ||
submit(args) | ||
} | ||
// In all other modes, just run the main class as prepared | ||
} else { | ||
runMain(childArgs, childClasspath, sysProps, childMainClass) | ||
} | ||
} | ||
|
||
/** | ||
* Prepare the environment for submitting an application. | ||
* This returns a 4-tuple: | ||
* (1) the arguments for the child process, | ||
* (2) a list of classpath entries for the child, | ||
* (3) a map of system properties, and | ||
* (4) the main class for the child | ||
* Exposed for testing. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and it also modified the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. because |
||
*/ | ||
private[spark] def prepareSubmitEnvironment(args: SparkSubmitArguments) | ||
: (Seq[String], Seq[String], Map[String, String], String) = { | ||
// Return values | ||
val childArgs = new ArrayBuffer[String]() | ||
val childClasspath = new ArrayBuffer[String]() | ||
val sysProps = new HashMap[String, String]() | ||
|
@@ -235,10 +298,13 @@ object SparkSubmit { | |
sysProp = "spark.driver.extraLibraryPath"), | ||
|
||
// Standalone cluster only | ||
// Do not set CL arguments here because there are multiple possibilities for the main class | ||
OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"), | ||
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"), | ||
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"), | ||
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"), | ||
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, sysProp = "spark.driver.memory"), | ||
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, sysProp = "spark.driver.cores"), | ||
OptionAssigner(args.supervise.toString, STANDALONE, CLUSTER, | ||
sysProp = "spark.driver.supervise"), | ||
|
||
// Yarn client only | ||
OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"), | ||
|
@@ -279,7 +345,6 @@ object SparkSubmit { | |
if (args.childArgs != null) { childArgs ++= args.childArgs } | ||
} | ||
|
||
|
||
// Map all arguments to command-line options or system properties for our chosen mode | ||
for (opt <- options) { | ||
if (opt.value != null && | ||
|
@@ -301,14 +366,21 @@ object SparkSubmit { | |
sysProps.put("spark.jars", jars.mkString(",")) | ||
} | ||
|
||
// In standalone-cluster mode, use Client as a wrapper around the user class | ||
if (clusterManager == STANDALONE && deployMode == CLUSTER) { | ||
childMainClass = "org.apache.spark.deploy.Client" | ||
if (args.supervise) { | ||
childArgs += "--supervise" | ||
// In standalone cluster mode, use the REST client to submit the application (Spark 1.3+). | ||
// All Spark parameters are expected to be passed to the client through system properties. | ||
if (args.isStandaloneCluster) { | ||
if (args.useRest) { | ||
childMainClass = "org.apache.spark.deploy.rest.StandaloneRestClient" | ||
childArgs += (args.primaryResource, args.mainClass) | ||
} else { | ||
// In legacy standalone cluster mode, use Client as a wrapper around the user class | ||
childMainClass = "org.apache.spark.deploy.Client" | ||
if (args.supervise) { childArgs += "--supervise" } | ||
Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) } | ||
Option(args.driverCores).foreach { c => childArgs += ("--cores", c) } | ||
childArgs += "launch" | ||
childArgs += (args.master, args.primaryResource, args.mainClass) | ||
} | ||
childArgs += "launch" | ||
childArgs += (args.master, args.primaryResource, args.mainClass) | ||
if (args.childArgs != null) { | ||
childArgs ++= args.childArgs | ||
} | ||
|
@@ -345,7 +417,7 @@ object SparkSubmit { | |
|
||
// Ignore invalid spark.driver.host in cluster modes. | ||
if (deployMode == CLUSTER) { | ||
sysProps -= ("spark.driver.host") | ||
sysProps -= "spark.driver.host" | ||
} | ||
|
||
// Resolve paths in certain spark properties | ||
|
@@ -374,9 +446,15 @@ object SparkSubmit { | |
(childArgs, childClasspath, sysProps, childMainClass) | ||
} | ||
|
||
private def launch( | ||
childArgs: ArrayBuffer[String], | ||
childClasspath: ArrayBuffer[String], | ||
/** | ||
* Run the main method of the child class using the provided launch environment. | ||
* | ||
* Note that this main class will not be the one provided by the user if we're | ||
* running cluster deploy mode or python applications. | ||
*/ | ||
private def runMain( | ||
childArgs: Seq[String], | ||
childClasspath: Seq[String], | ||
sysProps: Map[String, String], | ||
childMainClass: String, | ||
verbose: Boolean = false) { | ||
|
@@ -697,7 +775,7 @@ private[spark] object SparkSubmitUtils { | |
* Provides an indirection layer for passing arguments as system properties or flags to | ||
* the user's driver program or to downstream launcher tools. | ||
*/ | ||
private[spark] case class OptionAssigner( | ||
private case class OptionAssigner( | ||
value: String, | ||
clusterManager: Int, | ||
deployMode: Int, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why go out of our way to disable this in the local cluster?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
otherwise tests with local-cluster fail