Skip to content

Commit

Permalink
Distinguish client and server side Spark versions
Browse files Browse the repository at this point in the history
Otherwise it's a little ambiguous what we mean by SPARK_VERSION.
  • Loading branch information
Andrew Or committed Jan 23, 2015
1 parent b44e103 commit 51c5ca6
Show file tree
Hide file tree
Showing 11 changed files with 30 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ private[spark] abstract class DriverStatusRequestField extends SubmitRestProtoco
private[spark] object DriverStatusRequestField
extends SubmitRestProtocolFieldCompanion[DriverStatusRequestField] {
case object ACTION extends DriverStatusRequestField
case object SPARK_VERSION extends DriverStatusRequestField
case object CLIENT_SPARK_VERSION extends DriverStatusRequestField
case object MESSAGE extends DriverStatusRequestField
case object MASTER extends DriverStatusRequestField
case object DRIVER_ID extends DriverStatusRequestField
override val requiredFields = Seq(ACTION, SPARK_VERSION, MASTER, DRIVER_ID)
override val requiredFields = Seq(ACTION, CLIENT_SPARK_VERSION, MASTER, DRIVER_ID)
override val optionalFields = Seq(MESSAGE)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ private[spark] abstract class DriverStatusResponseField extends SubmitRestProtoc
private[spark] object DriverStatusResponseField
extends SubmitRestProtocolFieldCompanion[DriverStatusResponseField] {
case object ACTION extends DriverStatusResponseField
case object SPARK_VERSION extends DriverStatusResponseField
case object SERVER_SPARK_VERSION extends DriverStatusResponseField
case object MESSAGE extends DriverStatusResponseField
case object MASTER extends DriverStatusResponseField
case object DRIVER_ID extends DriverStatusResponseField
Expand All @@ -33,7 +33,7 @@ private[spark] object DriverStatusResponseField
case object DRIVER_STATE extends DriverStatusResponseField
case object WORKER_ID extends DriverStatusResponseField
case object WORKER_HOST_PORT extends DriverStatusResponseField
override val requiredFields = Seq(ACTION, SPARK_VERSION, MASTER, DRIVER_ID, SUCCESS)
override val requiredFields = Seq(ACTION, SERVER_SPARK_VERSION, MASTER, DRIVER_ID, SUCCESS)
override val optionalFields = Seq(MESSAGE, DRIVER_STATE, WORKER_ID, WORKER_HOST_PORT)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ package org.apache.spark.deploy.rest
private[spark] abstract class ErrorField extends SubmitRestProtocolField
private[spark] object ErrorField extends SubmitRestProtocolFieldCompanion[ErrorField] {
case object ACTION extends ErrorField
case object SPARK_VERSION extends ErrorField
case object SERVER_SPARK_VERSION extends ErrorField
case object MESSAGE extends ErrorField
override val requiredFields = Seq(ACTION, SPARK_VERSION, MESSAGE)
override val requiredFields = Seq(ACTION, SERVER_SPARK_VERSION, MESSAGE)
override val optionalFields = Seq.empty
}

/**
* An error message exchanged in the stable application submission REST protocol.
* An error message sent from the cluster manager
* in the stable application submission REST protocol.
*/
private[spark] class ErrorMessage extends SubmitRestProtocolMessage(
SubmitRestProtocolAction.ERROR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ private[spark] abstract class KillDriverRequestField extends SubmitRestProtocolF
private[spark] object KillDriverRequestField
extends SubmitRestProtocolFieldCompanion[KillDriverRequestField] {
case object ACTION extends KillDriverRequestField
case object SPARK_VERSION extends KillDriverRequestField
case object CLIENT_SPARK_VERSION extends KillDriverRequestField
case object MESSAGE extends KillDriverRequestField
case object MASTER extends KillDriverRequestField
case object DRIVER_ID extends KillDriverRequestField
override val requiredFields = Seq(ACTION, SPARK_VERSION, MASTER, DRIVER_ID)
override val requiredFields = Seq(ACTION, CLIENT_SPARK_VERSION, MASTER, DRIVER_ID)
override val optionalFields = Seq(MESSAGE)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ private[spark] abstract class KillDriverResponseField extends SubmitRestProtocol
private[spark] object KillDriverResponseField
extends SubmitRestProtocolFieldCompanion[KillDriverResponseField] {
case object ACTION extends KillDriverResponseField
case object SPARK_VERSION extends KillDriverResponseField
case object SERVER_SPARK_VERSION extends KillDriverResponseField
case object MESSAGE extends KillDriverResponseField
case object MASTER extends KillDriverResponseField
case object DRIVER_ID extends KillDriverResponseField
case object SUCCESS extends KillDriverResponseField
override val requiredFields = Seq(ACTION, SPARK_VERSION, MESSAGE, MASTER, DRIVER_ID, SUCCESS)
override val requiredFields = Seq(ACTION, SERVER_SPARK_VERSION, MESSAGE, MASTER, DRIVER_ID, SUCCESS)
override val optionalFields = Seq.empty
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
val dm = Option(args.driverMemory).map { m => Utils.memoryStringToMb(m).toString }.orNull
val em = Option(args.executorMemory).map { m => Utils.memoryStringToMb(m).toString }.orNull
val message = new SubmitDriverRequestMessage()
.setField(SPARK_VERSION, sparkVersion)
.setField(CLIENT_SPARK_VERSION, sparkVersion)
.setField(MASTER, args.master)
.setField(APP_NAME, args.name)
.setField(APP_RESOURCE, args.primaryResource)
Expand All @@ -115,7 +115,7 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
driverId: String): KillDriverRequestMessage = {
import KillDriverRequestField._
new KillDriverRequestMessage()
.setField(SPARK_VERSION, sparkVersion)
.setField(CLIENT_SPARK_VERSION, sparkVersion)
.setField(MASTER, master)
.setField(DRIVER_ID, driverId)
}
Expand All @@ -126,7 +126,7 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
driverId: String): DriverStatusRequestMessage = {
import DriverStatusRequestField._
new DriverStatusRequestMessage()
.setField(SPARK_VERSION, sparkVersion)
.setField(CLIENT_SPARK_VERSION, sparkVersion)
.setField(MASTER, master)
.setField(DRIVER_ID, driverId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private[spark] class StandaloneRestServerHandler(
val response = AkkaUtils.askWithReply[SubmitDriverResponse](
RequestSubmitDriver(driverDescription), masterActor, askTimeout)
new SubmitDriverResponseMessage()
.setField(SPARK_VERSION, sparkVersion)
.setField(SERVER_SPARK_VERSION, sparkVersion)
.setField(MESSAGE, response.message)
.setField(MASTER, masterUrl)
.setField(SUCCESS, response.success.toString)
Expand All @@ -77,7 +77,7 @@ private[spark] class StandaloneRestServerHandler(
val response = AkkaUtils.askWithReply[KillDriverResponse](
RequestKillDriver(driverId), masterActor, askTimeout)
new KillDriverResponseMessage()
.setField(SPARK_VERSION, sparkVersion)
.setField(SERVER_SPARK_VERSION, sparkVersion)
.setField(MESSAGE, response.message)
.setField(MASTER, masterUrl)
.setField(DRIVER_ID, driverId)
Expand All @@ -97,7 +97,7 @@ private[spark] class StandaloneRestServerHandler(
s"Exception from the cluster:\n$e\n$stackTraceString"
}
new DriverStatusResponseMessage()
.setField(SPARK_VERSION, sparkVersion)
.setField(SERVER_SPARK_VERSION, sparkVersion)
.setField(MASTER, masterUrl)
.setField(DRIVER_ID, driverId)
.setField(SUCCESS, response.found.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private[spark] abstract class SubmitDriverRequestField extends SubmitRestProtoco
private[spark] object SubmitDriverRequestField
extends SubmitRestProtocolFieldCompanion[SubmitDriverRequestField] {
case object ACTION extends SubmitDriverRequestField
case object SPARK_VERSION extends SubmitDriverRequestField
case object CLIENT_SPARK_VERSION extends SubmitDriverRequestField
case object MESSAGE extends SubmitDriverRequestField
case object MASTER extends SubmitDriverRequestField
case object APP_NAME extends SubmitDriverRequestField
Expand All @@ -51,7 +51,7 @@ private[spark] object SubmitDriverRequestField
case object APP_ARGS extends SubmitDriverRequestField
case object SPARK_PROPERTIES extends SubmitDriverRequestField
case object ENVIRONMENT_VARIABLES extends SubmitDriverRequestField
override val requiredFields = Seq(ACTION, SPARK_VERSION, MASTER, APP_NAME, APP_RESOURCE)
override val requiredFields = Seq(ACTION, CLIENT_SPARK_VERSION, MASTER, APP_NAME, APP_RESOURCE)
override val optionalFields = Seq(MESSAGE, MAIN_CLASS, JARS, FILES, PY_FILES, DRIVER_MEMORY,
DRIVER_CORES, DRIVER_EXTRA_JAVA_OPTIONS, DRIVER_EXTRA_CLASS_PATH, DRIVER_EXTRA_LIBRARY_PATH,
SUPERVISE_DRIVER, EXECUTOR_MEMORY, TOTAL_EXECUTOR_CORES, APP_ARGS, SPARK_PROPERTIES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ private[spark] abstract class SubmitDriverResponseField extends SubmitRestProtoc
private[spark] object SubmitDriverResponseField
extends SubmitRestProtocolFieldCompanion[SubmitDriverResponseField] {
case object ACTION extends SubmitDriverResponseField
case object SPARK_VERSION extends SubmitDriverResponseField
case object SERVER_SPARK_VERSION extends SubmitDriverResponseField
case object MESSAGE extends SubmitDriverResponseField
case object MASTER extends SubmitDriverResponseField
case object SUCCESS extends SubmitDriverResponseField
case object DRIVER_ID extends SubmitDriverResponseField
override val requiredFields = Seq(ACTION, SPARK_VERSION, MESSAGE, MASTER, SUCCESS)
override val requiredFields = Seq(ACTION, SERVER_SPARK_VERSION, MESSAGE, MASTER, SUCCESS)
override val optionalFields = Seq(DRIVER_ID)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@ import org.apache.spark.util.Utils

/**
* A field used in a SubmitRestProtocolMessage.
* Three special fields ACTION, SPARK_VERSION, and MESSAGE are common across all messages.
* There are a few special fields:
* - ACTION entirely specifies the type of the message and is required in all messages
* - MESSAGE contains arbitrary messages and is common, but not required, in all messages
* - CLIENT_SPARK_VERSION is required in all messages sent from the client
* - SERVER_SPARK_VERSION is required in all messages sent from the server
*/
private[spark] abstract class SubmitRestProtocolField
private[spark] object SubmitRestProtocolField {
def isActionField(field: String): Boolean = field == "ACTION"
def isSparkVersionField(field: String): Boolean = field == "SPARK_VERSION"
def isSparkVersionField(field: String): Boolean = field.endsWith("_SPARK_VERSION")
def isMessageField(field: String): Boolean = field == "MESSAGE"
}

Expand Down Expand Up @@ -129,7 +133,7 @@ private[spark] abstract class SubmitRestProtocolMessage(

/**
* Return a JObject that represents the JSON form of this message.
* This orders the fields by ACTION (first) < SPARK_VERSION < MESSAGE < * (last)
* This orders the fields by ACTION (first) < SERVER_SPARK_VERSION < MESSAGE < * (last)
* and ignores fields with null values.
*/
protected def toJsonObject: JObject = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private[spark] abstract class SubmitRestServerHandler extends AbstractHandler wi
private def handleError(message: String): ErrorMessage = {
import ErrorField._
new ErrorMessage()
.setField(SPARK_VERSION, sparkVersion)
.setField(SERVER_SPARK_VERSION, sparkVersion)
.setField(MESSAGE, message)
}
}

0 comments on commit 51c5ca6

Please sign in to comment.