Skip to content

[SPARK-5388] Provide a stable application submission gateway for standalone cluster mode #4216

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 59 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
53e7c0e
Initial client, server, and all the messages
Jan 15, 2015
af9d9cb
Integrate REST protocol in standalone mode
Jan 17, 2015
6ff088d
Rename classes to generalize REST protocol
Jan 20, 2015
484bd21
Specify an ordering for fields in SubmitDriverRequestMessage
Jan 20, 2015
e958cae
Supported nested values in messages
Jan 20, 2015
544de1d
Major clean ups in code and comments
Jan 21, 2015
120ab9d
Support kill and request driver status through SparkSubmit
Jan 21, 2015
b44e103
Implement status requests + fix validation behavior
Jan 22, 2015
51c5ca6
Distinguish client and server side Spark versions
Jan 22, 2015
9e21b72
Action -> SparkSubmitAction (minor)
Jan 23, 2015
63c05b3
Remove MASTER as a field (minor)
Jan 23, 2015
206cae4
Refactor and add tests for the REST protocol
Jan 27, 2015
77774ba
Minor fixes
Jan 27, 2015
6568ca5
Merge branch 'master' of github.com:apache/spark into rest
Jan 27, 2015
d8d3717
Use a daemon thread pool for REST server
Jan 27, 2015
837475b
Show the REST port on the Master UI
Jan 27, 2015
e42c131
Add end-to-end tests for standalone REST protocol
Jan 28, 2015
efa5e18
Merge branch 'master' of github.com:apache/spark into rest
Jan 28, 2015
d7a1f9f
Fix local cluster tests
Jan 28, 2015
df90e8b
Use Jackson for JSON de/serialization
Jan 29, 2015
8d43486
Replace SubmitRestProtocolAction with class name
Jan 29, 2015
3db7379
Fix comments and name fields for better error messages
Jan 29, 2015
e2104e6
stable -> rest
Jan 29, 2015
914fdff
Merge branch 'master' of github.com:apache/spark into rest
Jan 29, 2015
9581df7
Clean up uses of exceptions
Jan 30, 2015
e2f7f5f
Provide more safeguard against missing fields
Jan 30, 2015
7ee6737
Merge branch 'master' of github.com:apache/spark into rest
Jan 30, 2015
bf696ff
Add checks for enabling REST when using kill/status
Jan 30, 2015
6c57b4b
Increase timeout in end-to-end tests
Jan 30, 2015
b2fef8b
Abstract the success field to the general response
Jan 30, 2015
ade28fd
Clean up REST response output in Spark submit
Feb 1, 2015
9229433
Reduce duplicate naming in REST field
Feb 1, 2015
1f1c03f
Use Jackson's DefaultScalaModule to simplify messages
Feb 2, 2015
42e5de4
Merge branch 'master' of github.com:apache/spark into rest
Feb 2, 2015
9e0d1af
Move some classes around to reduce number of files (minor)
Feb 2, 2015
581f7bf
Merge branch 'master' of github.com:apache/spark into rest
Feb 2, 2015
721819f
Provide more REST-like interface for submit/kill/status
Feb 4, 2015
f98660b
Version the protocol and include it in REST URL
Feb 4, 2015
792e112
Use specific HTTP response codes on error
Feb 4, 2015
bbbd329
Merge branch 'master' of github.com:apache/spark into rest
Feb 4, 2015
c643f64
Fix style
Feb 4, 2015
252d53c
Clean up server error handling behavior further
Feb 4, 2015
9165ae8
Fall back to Akka if endpoint was not REST
Feb 4, 2015
37538e0
Merge branch 'master' of github.com:apache/spark into rest
Feb 4, 2015
8188e61
Upgrade Jackson from 2.3.0 to 2.4.4
Feb 4, 2015
09f873a
Fix style
Feb 4, 2015
9fee16f
Include server protocol version on mismatch
Feb 5, 2015
cbd670b
Include unknown fields, if any, in server response
Feb 5, 2015
40e6095
Pass submit parameters through system properties
Feb 5, 2015
6fc7670
Report REST server response back to the user
Feb 5, 2015
c9a8ad7
Do not include appResource and mainClass as properties
Feb 5, 2015
b4695e7
Merge branch 'master' of github.com:apache/spark into rest
Feb 6, 2015
9c82a36
Minor comment and wording updates
Feb 6, 2015
d2b1ef8
Comment changes + minor code refactoring across the board
Feb 6, 2015
02b5cea
Fix tests
Feb 6, 2015
b9e2a08
Minor comments
Feb 6, 2015
dfe4bd7
Merge branch 'master' of github.com:apache/spark into rest
Feb 6, 2015
6f0c597
Use nullable fields for integer and boolean values
Feb 6, 2015
8d7ce07
Merge branch 'master' of github.com:apache/spark into rest
Feb 6, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,14 @@
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-graphite</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.10</artifactId>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ span.expand-details {
float: right;
}

span.rest-uri {
font-size: 10pt;
font-style: italic;
color: gray;
}

pre {
font-size: 0.8em;
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2110,7 +2110,7 @@ object SparkContext extends Logging {

val scheduler = new TaskSchedulerImpl(sc)
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
val masterUrls = localCluster.start()
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
Expand Down
20 changes: 12 additions & 8 deletions core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ import org.apache.spark.util.{IntParam, MemoryParam}
* Command-line parser for the driver client.
*/
private[spark] class ClientArguments(args: Array[String]) {
val defaultCores = 1
val defaultMemory = 512
import ClientArguments._

var cmd: String = "" // 'launch' or 'kill'
var logLevel = Level.WARN
Expand All @@ -39,9 +38,9 @@ private[spark] class ClientArguments(args: Array[String]) {
var master: String = ""
var jarUrl: String = ""
var mainClass: String = ""
var supervise: Boolean = false
var memory: Int = defaultMemory
var cores: Int = defaultCores
var supervise: Boolean = DEFAULT_SUPERVISE
var memory: Int = DEFAULT_MEMORY
var cores: Int = DEFAULT_CORES
private var _driverOptions = ListBuffer[String]()
def driverOptions = _driverOptions.toSeq

Expand All @@ -50,7 +49,7 @@ private[spark] class ClientArguments(args: Array[String]) {

parse(args.toList)

def parse(args: List[String]): Unit = args match {
private def parse(args: List[String]): Unit = args match {
case ("--cores" | "-c") :: IntParam(value) :: tail =>
cores = value
parse(tail)
Expand Down Expand Up @@ -106,9 +105,10 @@ private[spark] class ClientArguments(args: Array[String]) {
|Usage: DriverClient kill <active-master> <driver-id>
|
|Options:
| -c CORES, --cores CORES Number of cores to request (default: $defaultCores)
| -m MEMORY, --memory MEMORY Megabytes of memory to request (default: $defaultMemory)
| -c CORES, --cores CORES Number of cores to request (default: $DEFAULT_CORES)
| -m MEMORY, --memory MEMORY Megabytes of memory to request (default: $DEFAULT_MEMORY)
| -s, --supervise Whether to restart the driver on failure
| (default: $DEFAULT_SUPERVISE)
| -v, --verbose Print more debugging output
""".stripMargin
System.err.println(usage)
Expand All @@ -117,6 +117,10 @@ private[spark] class ClientArguments(args: Array[String]) {
}

object ClientArguments {
private[spark] val DEFAULT_CORES = 1
private[spark] val DEFAULT_MEMORY = 512 // MB
private[spark] val DEFAULT_SUPERVISE = false

def isValidJarUrl(s: String): Boolean = {
try {
val uri = new URI(s)
Expand Down
15 changes: 11 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,22 @@ private[deploy] object DeployMessages {

// Master to MasterWebUI

case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo],
activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo],
activeDrivers: Array[DriverInfo], completedDrivers: Array[DriverInfo],
status: MasterState) {
case class MasterStateResponse(
host: String,
port: Int,
restPort: Option[Int],
workers: Array[WorkerInfo],
activeApps: Array[ApplicationInfo],
completedApps: Array[ApplicationInfo],
activeDrivers: Array[DriverInfo],
completedDrivers: Array[DriverInfo],
status: MasterState) {

Utils.checkHost(host, "Required hostname")
assert (port > 0)

def uri = "spark://" + host + ":" + port
def restUri: Option[String] = restPort.map { p => "spark://" + host + ":" + p }
}

// WorkerWebUI to Worker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ import org.apache.spark.util.Utils
* fault recovery without spinning up a lot of processes.
*/
private[spark]
class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int)
class LocalSparkCluster(
numWorkers: Int,
coresPerWorker: Int,
memoryPerWorker: Int,
conf: SparkConf)
extends Logging {

private val localHostname = Utils.localHostName()
Expand All @@ -43,9 +47,11 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
def start(): Array[String] = {
logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")

// Disable REST server on Master in this mode unless otherwise specified
val _conf = conf.clone().setIfMissing("spark.master.rest.enabled", "false")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why go out of our way to disable this in the local cluster?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otherwise tests with local-cluster fail


/* Start the Master */
val conf = new SparkConf(false)
val (masterSystem, masterPort, _) = Master.startSystemAndActor(localHostname, 0, 0, conf)
val (masterSystem, masterPort, _, _) = Master.startSystemAndActor(localHostname, 0, 0, _conf)
masterActorSystems += masterSystem
val masterUrl = "spark://" + localHostname + ":" + masterPort
val masters = Array(masterUrl)
Expand Down
142 changes: 110 additions & 32 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,35 @@
package org.apache.spark.deploy

import java.io.{File, PrintStream}
import java.lang.reflect.{Modifier, InvocationTargetException}
import java.lang.reflect.{InvocationTargetException, Modifier}
import java.net.URL

import scala.collection.mutable.{ArrayBuffer, HashMap, Map}

import org.apache.hadoop.fs.Path
import org.apache.ivy.Ivy
import org.apache.ivy.core.LogOptions
import org.apache.ivy.core.module.descriptor.{DefaultExcludeRule, DefaultDependencyDescriptor, DefaultModuleDescriptor}
import org.apache.ivy.core.module.id.{ModuleId, ArtifactId, ModuleRevisionId}
import org.apache.ivy.core.module.descriptor._
import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId}
import org.apache.ivy.core.report.ResolveReport
import org.apache.ivy.core.resolve.{IvyNode, ResolveOptions}
import org.apache.ivy.core.resolve.ResolveOptions
import org.apache.ivy.core.retrieve.RetrieveOptions
import org.apache.ivy.core.settings.IvySettings
import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}
import org.apache.spark.executor.ExecutorURLClassLoader

import org.apache.spark.deploy.rest._
import org.apache.spark.executor._
import org.apache.spark.util.Utils
import org.apache.spark.executor.ChildExecutorURLClassLoader
import org.apache.spark.executor.MutableURLClassLoader

/**
* Whether to submit, kill, or request the status of an application.
* The latter two operations are currently supported only for standalone cluster mode.
*/
private[spark] object SparkSubmitAction extends Enumeration {
type SparkSubmitAction = Value
val SUBMIT, KILL, REQUEST_STATUS = Value
}

/**
* Main gateway of launching a Spark application.
Expand Down Expand Up @@ -83,21 +93,74 @@ object SparkSubmit {
if (appArgs.verbose) {
printStream.println(appArgs)
}
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose)
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}

/** Kill an existing submission using the REST protocol. Standalone cluster mode only. */
private def kill(args: SparkSubmitArguments): Unit = {
new StandaloneRestClient()
.killSubmission(args.master, args.submissionToKill)
}

/**
* @return a tuple containing
* (1) the arguments for the child process,
* (2) a list of classpath entries for the child,
* (3) a list of system properties and env vars, and
* (4) the main class for the child
* Request the status of an existing submission using the REST protocol.
* Standalone cluster mode only.
*/
private[spark] def createLaunchEnv(args: SparkSubmitArguments)
: (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = {
private def requestStatus(args: SparkSubmitArguments): Unit = {
new StandaloneRestClient()
.requestSubmissionStatus(args.master, args.submissionToRequestStatusFor)
}

// Values to return
/**
* Submit the application using the provided parameters.
*
* This runs in two steps. First, we prepare the launch environment by setting up
* the appropriate classpath, system properties, and application arguments for
* running the child main class based on the cluster manager and the deploy mode.
* Second, we use this launch environment to invoke the main method of the child
* main class.
*/
private[spark] def submit(args: SparkSubmitArguments): Unit = {
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
// In standalone cluster mode, there are two submission gateways:
// (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper
// (2) The new REST-based gateway introduced in Spark 1.3
// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
// to use the legacy gateway if the master endpoint turns out to be not a REST server.
if (args.isStandaloneCluster && args.useRest) {
try {
printStream.println("Running Spark using the REST application submission protocol.")
runMain(childArgs, childClasspath, sysProps, childMainClass)
} catch {
// Fail over to use the legacy submission gateway
case e: SubmitRestConnectionException =>
printWarning(s"Master endpoint ${args.master} was not a REST server. " +
"Falling back to legacy submission gateway instead.")
args.useRest = false
submit(args)
}
// In all other modes, just run the main class as prepared
} else {
runMain(childArgs, childClasspath, sysProps, childMainClass)
}
}

/**
* Prepare the environment for submitting an application.
* This returns a 4-tuple:
* (1) the arguments for the child process,
* (2) a list of classpath entries for the child,
* (3) a map of system properties, and
* (4) the main class for the child
* Exposed for testing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and it also modified the args that are passed in, right?
which is a little confusing as a side effect, why not just return a new, modified args?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because SparkSubmitArguments has a bunch of vars and cloning it won't be easy

*/
private[spark] def prepareSubmitEnvironment(args: SparkSubmitArguments)
: (Seq[String], Seq[String], Map[String, String], String) = {
// Return values
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
val sysProps = new HashMap[String, String]()
Expand Down Expand Up @@ -235,10 +298,13 @@ object SparkSubmit {
sysProp = "spark.driver.extraLibraryPath"),

// Standalone cluster only
// Do not set CL arguments here because there are multiple possibilities for the main class
OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"),
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, sysProp = "spark.driver.memory"),
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, sysProp = "spark.driver.cores"),
OptionAssigner(args.supervise.toString, STANDALONE, CLUSTER,
sysProp = "spark.driver.supervise"),

// Yarn client only
OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
Expand Down Expand Up @@ -279,7 +345,6 @@ object SparkSubmit {
if (args.childArgs != null) { childArgs ++= args.childArgs }
}


// Map all arguments to command-line options or system properties for our chosen mode
for (opt <- options) {
if (opt.value != null &&
Expand All @@ -301,14 +366,21 @@ object SparkSubmit {
sysProps.put("spark.jars", jars.mkString(","))
}

// In standalone-cluster mode, use Client as a wrapper around the user class
if (clusterManager == STANDALONE && deployMode == CLUSTER) {
childMainClass = "org.apache.spark.deploy.Client"
if (args.supervise) {
childArgs += "--supervise"
// In standalone cluster mode, use the REST client to submit the application (Spark 1.3+).
// All Spark parameters are expected to be passed to the client through system properties.
if (args.isStandaloneCluster) {
if (args.useRest) {
childMainClass = "org.apache.spark.deploy.rest.StandaloneRestClient"
childArgs += (args.primaryResource, args.mainClass)
} else {
// In legacy standalone cluster mode, use Client as a wrapper around the user class
childMainClass = "org.apache.spark.deploy.Client"
if (args.supervise) { childArgs += "--supervise" }
Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
childArgs += "launch"
childArgs += (args.master, args.primaryResource, args.mainClass)
}
childArgs += "launch"
childArgs += (args.master, args.primaryResource, args.mainClass)
if (args.childArgs != null) {
childArgs ++= args.childArgs
}
Expand Down Expand Up @@ -345,7 +417,7 @@ object SparkSubmit {

// Ignore invalid spark.driver.host in cluster modes.
if (deployMode == CLUSTER) {
sysProps -= ("spark.driver.host")
sysProps -= "spark.driver.host"
}

// Resolve paths in certain spark properties
Expand Down Expand Up @@ -374,9 +446,15 @@ object SparkSubmit {
(childArgs, childClasspath, sysProps, childMainClass)
}

private def launch(
childArgs: ArrayBuffer[String],
childClasspath: ArrayBuffer[String],
/**
* Run the main method of the child class using the provided launch environment.
*
* Note that this main class will not be the one provided by the user if we're
* running cluster deploy mode or python applications.
*/
private def runMain(
childArgs: Seq[String],
childClasspath: Seq[String],
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean = false) {
Expand Down Expand Up @@ -697,7 +775,7 @@ private[spark] object SparkSubmitUtils {
* Provides an indirection layer for passing arguments as system properties or flags to
* the user's driver program or to downstream launcher tools.
*/
private[spark] case class OptionAssigner(
private case class OptionAssigner(
value: String,
clusterManager: Int,
deployMode: Int,
Expand Down
Loading