Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5388] Provide a stable application submission gateway for standalone cluster mode #4216

Closed
wants to merge 59 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
53e7c0e
Initial client, server, and all the messages
Jan 15, 2015
af9d9cb
Integrate REST protocol in standalone mode
Jan 17, 2015
6ff088d
Rename classes to generalize REST protocol
Jan 20, 2015
484bd21
Specify an ordering for fields in SubmitDriverRequestMessage
Jan 20, 2015
e958cae
Supported nested values in messages
Jan 20, 2015
544de1d
Major clean ups in code and comments
Jan 21, 2015
120ab9d
Support kill and request driver status through SparkSubmit
Jan 21, 2015
b44e103
Implement status requests + fix validation behavior
Jan 22, 2015
51c5ca6
Distinguish client and server side Spark versions
Jan 22, 2015
9e21b72
Action -> SparkSubmitAction (minor)
Jan 23, 2015
63c05b3
Remove MASTER as a field (minor)
Jan 23, 2015
206cae4
Refactor and add tests for the REST protocol
Jan 27, 2015
77774ba
Minor fixes
Jan 27, 2015
6568ca5
Merge branch 'master' of github.com:apache/spark into rest
Jan 27, 2015
d8d3717
Use a daemon thread pool for REST server
Jan 27, 2015
837475b
Show the REST port on the Master UI
Jan 27, 2015
e42c131
Add end-to-end tests for standalone REST protocol
Jan 28, 2015
efa5e18
Merge branch 'master' of github.com:apache/spark into rest
Jan 28, 2015
d7a1f9f
Fix local cluster tests
Jan 28, 2015
df90e8b
Use Jackson for JSON de/serialization
Jan 29, 2015
8d43486
Replace SubmitRestProtocolAction with class name
Jan 29, 2015
3db7379
Fix comments and name fields for better error messages
Jan 29, 2015
e2104e6
stable -> rest
Jan 29, 2015
914fdff
Merge branch 'master' of github.com:apache/spark into rest
Jan 29, 2015
9581df7
Clean up uses of exceptions
Jan 30, 2015
e2f7f5f
Provide more safeguard against missing fields
Jan 30, 2015
7ee6737
Merge branch 'master' of github.com:apache/spark into rest
Jan 30, 2015
bf696ff
Add checks for enabling REST when using kill/status
Jan 30, 2015
6c57b4b
Increase timeout in end-to-end tests
Jan 30, 2015
b2fef8b
Abstract the success field to the general response
Jan 30, 2015
ade28fd
Clean up REST response output in Spark submit
Feb 1, 2015
9229433
Reduce duplicate naming in REST field
Feb 1, 2015
1f1c03f
Use Jackson's DefaultScalaModule to simplify messages
Feb 2, 2015
42e5de4
Merge branch 'master' of github.com:apache/spark into rest
Feb 2, 2015
9e0d1af
Move some classes around to reduce number of files (minor)
Feb 2, 2015
581f7bf
Merge branch 'master' of github.com:apache/spark into rest
Feb 2, 2015
721819f
Provide more REST-like interface for submit/kill/status
Feb 4, 2015
f98660b
Version the protocol and include it in REST URL
Feb 4, 2015
792e112
Use specific HTTP response codes on error
Feb 4, 2015
bbbd329
Merge branch 'master' of github.com:apache/spark into rest
Feb 4, 2015
c643f64
Fix style
Feb 4, 2015
252d53c
Clean up server error handling behavior further
Feb 4, 2015
9165ae8
Fall back to Akka if endpoint was not REST
Feb 4, 2015
37538e0
Merge branch 'master' of github.com:apache/spark into rest
Feb 4, 2015
8188e61
Upgrade Jackson from 2.3.0 to 2.4.4
Feb 4, 2015
09f873a
Fix style
Feb 4, 2015
9fee16f
Include server protocol version on mismatch
Feb 5, 2015
cbd670b
Include unknown fields, if any, in server response
Feb 5, 2015
40e6095
Pass submit parameters through system properties
Feb 5, 2015
6fc7670
Report REST server response back to the user
Feb 5, 2015
c9a8ad7
Do not include appResource and mainClass as properties
Feb 5, 2015
b4695e7
Merge branch 'master' of github.com:apache/spark into rest
Feb 6, 2015
9c82a36
Minor comment and wording updates
Feb 6, 2015
d2b1ef8
Comment changes + minor code refactoring across the board
Feb 6, 2015
02b5cea
Fix tests
Feb 6, 2015
b9e2a08
Minor comments
Feb 6, 2015
dfe4bd7
Merge branch 'master' of github.com:apache/spark into rest
Feb 6, 2015
6f0c597
Use nullable fields for integer and boolean values
Feb 6, 2015
8d7ce07
Merge branch 'master' of github.com:apache/spark into rest
Feb 6, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Specify an ordering for fields in SubmitDriverRequestMessage
Previously APP_ARGs, SPARK_PROPERTYs and ENVIRONMENT_VARIABLEs
will appear in the JSON at random places. Now they are grouped
together at the end of the JSON blob.
  • Loading branch information
Andrew Or committed Jan 20, 2015
commit 484bd2172b847433c989d7c450fbbc99dddb1f56
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ private[spark] class SubmitDriverRequestMessage extends SubmitRestProtocolMessag
SubmitDriverRequestField.ACTION,
SubmitDriverRequestField.requiredFields) {

import SubmitDriverRequestField._

// Ensure continuous range of app arg indices starting from 0
override def validate(): this.type = {
import SubmitDriverRequestField._
Expand All @@ -93,6 +95,22 @@ private[spark] class SubmitDriverRequestMessage extends SubmitRestProtocolMessag
}
super.validate()
}

// List the fields in the following order:
// ACTION < SPARK_VERSION < * < APP_ARG < SPARK_PROPERTY < ENVIRONMENT_VARIABLE < MESSAGE
protected override def sortedFields: Seq[(SubmitRestProtocolField, String)] = {
fields.toSeq.sortBy { case (k, _) =>
k match {
case ACTION => 0
case SPARK_VERSION => 1
case APP_ARG(index) => 10 + index
case SPARK_PROPERTY(propKey) => 100
case ENVIRONMENT_VARIABLE(envKey) => 1000
case MESSAGE => Int.MaxValue
case _ => 2
}
}
}
}

private[spark] object SubmitDriverRequestMessage extends SubmitRestProtocolMessageCompanion {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@ import org.json4s.jackson.JsonMethods._
import org.json4s.JsonAST._

import org.apache.spark.{Logging, SparkException}
import org.apache.spark.util.Utils
import org.apache.spark.util.{JsonProtocol, Utils}

/**
* A field used in a SubmitRestProtocolMessage.
* Three special fields ACTION, SPARK_VERSION, and MESSAGE are common across all messages.
*/
private[spark] abstract class SubmitRestProtocolField
private[spark] object SubmitRestProtocolField {
/** Return whether the provided field name refers to the ACTION field. */
def isActionField(field: String): Boolean = field == "ACTION"
def isSparkVersionField(field: String): Boolean = field == "SPARK_VERSION"
def isMessageField(field: String): Boolean = field == "MESSAGE"
}

/**
Expand Down Expand Up @@ -125,23 +126,26 @@ private[spark] abstract class SubmitRestProtocolMessage(

/** Return the JSON representation of this message. */
def toJson: String = {
val stringFields = fields
val jsonFields = sortedFields
.filter { case (_, v) => v != null }
.map { case (k, v) => (k.toString, v) }
val jsonFields = fieldsToJson(stringFields)
pretty(render(jsonFields))
.map { case (k, v) => JField(k.toString, JString(v)) }
.toList
pretty(render(JObject(jsonFields)))
}

/**
* Return the JSON representation of the message fields, putting ACTION first.
* This assumes that applying `org.apache.spark.util.JsonProtocol.mapFromJson`
* to the result yields the original input.
* Return a list of (field, value) pairs with the following ordering:
* ACTION < SPARK_VERSION < * < MESSAGE
*/
private def fieldsToJson(fields: Map[String, String]): JValue = {
val jsonFields = fields.toList
.sortBy { case (k, _) => if (isActionField(k)) 0 else 1 }
.map { case (k, v) => JField(k, JString(v)) }
JObject(jsonFields)
protected def sortedFields: Seq[(SubmitRestProtocolField, String)] = {
fields.toSeq.sortBy { case (k, _) =>
k.toString match {
case x if isActionField(x) => 0
case x if isSparkVersionField(x) => 1
case x if isMessageField(x) => Int.MaxValue
case _ => 2
}
}
}
}

Expand All @@ -155,7 +159,7 @@ private[spark] object SubmitRestProtocolMessage {
* If such a field does not exist in the JSON, throw an exception.
*/
def fromJson(json: String): SubmitRestProtocolMessage = {
val fields = org.apache.spark.util.JsonProtocol.mapFromJson(parse(json))
val fields = JsonProtocol.mapFromJson(parse(json))
val action = fields
.flatMap { case (k, v) => if (isActionField(k)) Some(v) else None }
.headOption
Expand Down