Skip to content

Commit

Permalink
stable -> rest
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Jan 29, 2015
1 parent 3db7379 commit e2104e6
Show file tree
Hide file tree
Showing 8 changed files with 22 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ span.expand-details {
float: right;
}

span.stable-uri {
span.rest-uri {
font-size: 10pt;
font-style: italic;
color: gray;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private[deploy] object DeployMessages {
case class MasterStateResponse(
host: String,
port: Int,
stablePort: Option[Int],
restPort: Option[Int],
workers: Array[WorkerInfo],
activeApps: Array[ApplicationInfo],
completedApps: Array[ApplicationInfo],
Expand All @@ -163,7 +163,7 @@ private[deploy] object DeployMessages {
assert (port > 0)

def uri = "spark://" + host + ":" + port
def stableUri: Option[String] = stablePort.map { p => "spark://" + host + ":" + p }
def restUri: Option[String] = restPort.map { p => "spark://" + host + ":" + p }
}

// WorkerWebUI to Worker
Expand Down
11 changes: 6 additions & 5 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,15 @@ object SparkSubmit {
}

/**
* Kill an existing driver using the stable REST protocol. Standalone cluster mode only.
* Kill an existing driver using the REST application submission protocol.
* Standalone cluster mode only.
*/
private def kill(args: SparkSubmitArguments): Unit = {
new StandaloneRestClient().killDriver(args.master, args.driverToKill)
}

/**
* Request the status of an existing driver using the stable REST protocol.
* Request the status of an existing driver using the REST application submission protocol.
* Standalone cluster mode only.
*/
private def requestStatus(args: SparkSubmitArguments): Unit = {
Expand All @@ -112,15 +113,15 @@ object SparkSubmit {
* Second, we use this launch environment to invoke the main method of the child
* main class.
*
* As of Spark 1.3, a stable REST-based application submission gateway is introduced.
* As of Spark 1.3, a REST-based application submission gateway is introduced.
* If this is enabled, then we will run standalone cluster mode by passing the submit
* parameters directly to a REST client, which will submit the application using the
* REST protocol instead.
*/
private[spark] def submit(args: SparkSubmitArguments): Unit = {
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
if (args.isStandaloneCluster && args.isRestEnabled) {
printStream.println("Running standalone cluster mode using the stable REST protocol.")
printStream.println("Running Spark using the REST application submission protocol.")
new StandaloneRestClient().submitDriver(args)
} else {
runMain(childArgs, childClasspath, sysProps, childMainClass)
Expand Down Expand Up @@ -305,7 +306,7 @@ object SparkSubmit {
}

// In standalone-cluster mode, use Client as a wrapper around the user class
// Note that we won't actually launch this class if we're using the stable REST protocol
// Note that we won't actually launch this class if we're using the REST protocol
if (args.isStandaloneCluster && !args.isRestEnabled) {
childMainClass = "org.apache.spark.deploy.Client"
if (args.supervise) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
master.startsWith("spark://") && deployMode == "cluster"
}

/** Return whether the stable application submission REST gateway is enabled. */
/** Return whether the REST application submission protocol is enabled. */
def isRestEnabled: Boolean = {
sparkProperties.get("spark.submit.rest.enabled").getOrElse("false").toBoolean
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private[spark] class Master(
private val restServerEnabled = conf.getBoolean("spark.master.rest.enabled", true)
private val restServer =
if (restServerEnabled) {
val port = conf.getInt("spark.master.rest.port", 17077)
val port = conf.getInt("spark.master.rest.port", 6066)
Some(new StandaloneRestServer(this, host, port))
} else {
None
Expand Down Expand Up @@ -910,6 +910,6 @@ private[spark] object Master extends Logging {
val timeout = AkkaUtils.askTimeout(conf)
val portsRequest = actor.ask(BoundPortsRequest)(timeout)
val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse]
(actorSystem, boundPort, portsResponse.webUIPort, portsResponse.stablePort)
(actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ private[master] object MasterMessages {

case object BoundPortsRequest

case class BoundPortsResponse(actorPort: Int, webUIPort: Int, stablePort: Option[Int])
case class BoundPortsResponse(actorPort: Int, webUIPort: Int, restPort: Option[Int])
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
<ul class="unstyled">
<li><strong>URL:</strong> {state.uri}</li>
{
state.stableUri.map { uri =>
state.restUri.map { uri =>
<li>
<strong>Stable URL:</strong> {uri}
<span class="stable-uri"> (for standalone cluster mode in Spark 1.3+)</span>
<strong>REST URL:</strong> {uri}
<span class="rest-uri"> (for standalone cluster mode in Spark 1.3+)</span>
</li>
}.getOrElse { Seq.empty }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.worker.Worker

/**
* End-to-end tests for the stable application submission protocol in standalone mode.
* End-to-end tests for the REST application submission protocol in standalone mode.
*/
class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
private val systemsToStop = new ArrayBuffer[ActorSystem]
Expand Down Expand Up @@ -89,7 +89,7 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B

/**
* Start a local cluster containing one Master and a few Workers.
* Do not use org.apache.spark.deploy.LocalCluster here because we want the REST URL.
* Do not use [[org.apache.spark.deploy.LocalSparkCluster]] here because we want the REST URL.
* Return the Master's REST URL to which applications should be submitted.
*/
private def startLocalCluster(): String = {
Expand All @@ -112,7 +112,7 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B
masterRestUrl
}

/** Submit the StandaloneRestApp and return the corresponding driver ID. */
/** Submit the [[StandaloneRestApp]] and return the corresponding driver ID. */
private def submitApplication(resultsFile: File, numbers: Seq[Int], size: Int): String = {
val appArgs = Seq(resultsFile.getAbsolutePath) ++ numbers.map(_.toString) ++ Seq(size.toString)
val commandLineArgs = Array(
Expand Down Expand Up @@ -164,7 +164,7 @@ private object StandaloneRestProtocolSuite {
private val pathPrefix = "org/apache/spark/deploy/rest"

/**
* Create a jar that contains all the class files needed for running the StandaloneRestApp.
* Create a jar that contains all the class files needed for running the [[StandaloneRestApp]].
* Return the absolute path to that jar.
*/
def createJar(): String = {
Expand All @@ -184,7 +184,7 @@ private object StandaloneRestProtocolSuite {
}

/**
* Return a list of class files compiled for StandaloneRestApp.
* Return a list of class files compiled for [[StandaloneRestApp]].
* This includes all the anonymous classes used in the application.
*/
private def getClassFiles: Seq[File] = {
Expand All @@ -197,7 +197,7 @@ private object StandaloneRestProtocolSuite {
}

/**
* Sample application to be submitted to the cluster using the stable gateway.
* Sample application to be submitted to the cluster using the REST gateway.
* All relevant classes will be packaged into a jar at run time.
*/
object StandaloneRestApp {
Expand Down

0 comments on commit e2104e6

Please sign in to comment.