diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusRequestMessage.scala b/core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusRequestMessage.scala index d435687606e3f..e6b513bd4b1c0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusRequestMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusRequestMessage.scala @@ -24,11 +24,11 @@ private[spark] abstract class DriverStatusRequestField extends SubmitRestProtoco private[spark] object DriverStatusRequestField extends SubmitRestProtocolFieldCompanion[DriverStatusRequestField] { case object ACTION extends DriverStatusRequestField - case object SPARK_VERSION extends DriverStatusRequestField + case object CLIENT_SPARK_VERSION extends DriverStatusRequestField case object MESSAGE extends DriverStatusRequestField case object MASTER extends DriverStatusRequestField case object DRIVER_ID extends DriverStatusRequestField - override val requiredFields = Seq(ACTION, SPARK_VERSION, MASTER, DRIVER_ID) + override val requiredFields = Seq(ACTION, CLIENT_SPARK_VERSION, MASTER, DRIVER_ID) override val optionalFields = Seq(MESSAGE) } diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusResponseMessage.scala b/core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusResponseMessage.scala index a0264643890a4..7a65f31c711f1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusResponseMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusResponseMessage.scala @@ -24,7 +24,7 @@ private[spark] abstract class DriverStatusResponseField extends SubmitRestProtoc private[spark] object DriverStatusResponseField extends SubmitRestProtocolFieldCompanion[DriverStatusResponseField] { case object ACTION extends DriverStatusResponseField - case object SPARK_VERSION extends DriverStatusResponseField + case object SERVER_SPARK_VERSION extends DriverStatusResponseField case object MESSAGE extends DriverStatusResponseField case object MASTER extends DriverStatusResponseField case object DRIVER_ID extends DriverStatusResponseField @@ -33,7 +33,7 @@ private[spark] object DriverStatusResponseField case object DRIVER_STATE extends DriverStatusResponseField case object WORKER_ID extends DriverStatusResponseField case object WORKER_HOST_PORT extends DriverStatusResponseField - override val requiredFields = Seq(ACTION, SPARK_VERSION, MASTER, DRIVER_ID, SUCCESS) + override val requiredFields = Seq(ACTION, SERVER_SPARK_VERSION, MASTER, DRIVER_ID, SUCCESS) override val optionalFields = Seq(MESSAGE, DRIVER_STATE, WORKER_ID, WORKER_HOST_PORT) } diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/ErrorMessage.scala b/core/src/main/scala/org/apache/spark/deploy/rest/ErrorMessage.scala index aefd7b60d32af..88d33462dd446 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/ErrorMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/ErrorMessage.scala @@ -23,14 +23,15 @@ package org.apache.spark.deploy.rest private[spark] abstract class ErrorField extends SubmitRestProtocolField private[spark] object ErrorField extends SubmitRestProtocolFieldCompanion[ErrorField] { case object ACTION extends ErrorField - case object SPARK_VERSION extends ErrorField + case object SERVER_SPARK_VERSION extends ErrorField case object MESSAGE extends ErrorField - override val requiredFields = Seq(ACTION, SPARK_VERSION, MESSAGE) + override val requiredFields = Seq(ACTION, SERVER_SPARK_VERSION, MESSAGE) override val optionalFields = Seq.empty } /** - * An error message exchanged in the stable application submission REST protocol. + * An error message sent from the cluster manager + * in the stable application submission REST protocol. */ private[spark] class ErrorMessage extends SubmitRestProtocolMessage( SubmitRestProtocolAction.ERROR, diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/KillDriverRequestMessage.scala b/core/src/main/scala/org/apache/spark/deploy/rest/KillDriverRequestMessage.scala index 3353bfba5a690..ae3c62d496b7e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/KillDriverRequestMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/KillDriverRequestMessage.scala @@ -24,11 +24,11 @@ private[spark] abstract class KillDriverRequestField extends SubmitRestProtocolF private[spark] object KillDriverRequestField extends SubmitRestProtocolFieldCompanion[KillDriverRequestField] { case object ACTION extends KillDriverRequestField - case object SPARK_VERSION extends KillDriverRequestField + case object CLIENT_SPARK_VERSION extends KillDriverRequestField case object MESSAGE extends KillDriverRequestField case object MASTER extends KillDriverRequestField case object DRIVER_ID extends KillDriverRequestField - override val requiredFields = Seq(ACTION, SPARK_VERSION, MASTER, DRIVER_ID) + override val requiredFields = Seq(ACTION, CLIENT_SPARK_VERSION, MASTER, DRIVER_ID) override val optionalFields = Seq(MESSAGE) } diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/KillDriverResponseMessage.scala b/core/src/main/scala/org/apache/spark/deploy/rest/KillDriverResponseMessage.scala index 974fcb9936fcb..b5dc4ee557cb2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/KillDriverResponseMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/KillDriverResponseMessage.scala @@ -24,12 +24,12 @@ private[spark] abstract class KillDriverResponseField extends SubmitRestProtocol private[spark] object KillDriverResponseField extends SubmitRestProtocolFieldCompanion[KillDriverResponseField] { case object ACTION extends KillDriverResponseField - case object SPARK_VERSION extends KillDriverResponseField + case object SERVER_SPARK_VERSION extends KillDriverResponseField case object MESSAGE extends KillDriverResponseField case object MASTER extends KillDriverResponseField case object DRIVER_ID extends KillDriverResponseField case object SUCCESS extends KillDriverResponseField - override val requiredFields = Seq(ACTION, SPARK_VERSION, MESSAGE, MASTER, DRIVER_ID, SUCCESS) + override val requiredFields = Seq(ACTION, SERVER_SPARK_VERSION, MESSAGE, MASTER, DRIVER_ID, SUCCESS) override val optionalFields = Seq.empty } diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala index 43164ae3a4c88..4f5c31c080fb2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala @@ -88,7 +88,7 @@ private[spark] class StandaloneRestClient extends SubmitRestClient { val dm = Option(args.driverMemory).map { m => Utils.memoryStringToMb(m).toString }.orNull val em = Option(args.executorMemory).map { m => Utils.memoryStringToMb(m).toString }.orNull val message = new SubmitDriverRequestMessage() - .setField(SPARK_VERSION, sparkVersion) + .setField(CLIENT_SPARK_VERSION, sparkVersion) .setField(MASTER, args.master) .setField(APP_NAME, args.name) .setField(APP_RESOURCE, args.primaryResource) @@ -115,7 +115,7 @@ private[spark] class StandaloneRestClient extends SubmitRestClient { driverId: String): KillDriverRequestMessage = { import KillDriverRequestField._ new KillDriverRequestMessage() - .setField(SPARK_VERSION, sparkVersion) + .setField(CLIENT_SPARK_VERSION, sparkVersion) .setField(MASTER, master) .setField(DRIVER_ID, driverId) } @@ -126,7 +126,7 @@ private[spark] class StandaloneRestClient extends SubmitRestClient { driverId: String): DriverStatusRequestMessage = { import DriverStatusRequestField._ new DriverStatusRequestMessage() - .setField(SPARK_VERSION, sparkVersion) + .setField(CLIENT_SPARK_VERSION, sparkVersion) .setField(MASTER, master) .setField(DRIVER_ID, driverId) } diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index 563ee1c251442..5a5afcc22833c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -62,7 +62,7 @@ private[spark] class StandaloneRestServerHandler( val response = AkkaUtils.askWithReply[SubmitDriverResponse]( RequestSubmitDriver(driverDescription), masterActor, askTimeout) new SubmitDriverResponseMessage() - .setField(SPARK_VERSION, sparkVersion) + .setField(SERVER_SPARK_VERSION, sparkVersion) .setField(MESSAGE, response.message) .setField(MASTER, masterUrl) .setField(SUCCESS, response.success.toString) @@ -77,7 +77,7 @@ private[spark] class StandaloneRestServerHandler( val response = AkkaUtils.askWithReply[KillDriverResponse]( RequestKillDriver(driverId), masterActor, askTimeout) new KillDriverResponseMessage() - .setField(SPARK_VERSION, sparkVersion) + .setField(SERVER_SPARK_VERSION, sparkVersion) .setField(MESSAGE, response.message) .setField(MASTER, masterUrl) .setField(DRIVER_ID, driverId) @@ -97,7 +97,7 @@ private[spark] class StandaloneRestServerHandler( s"Exception from the cluster:\n$e\n$stackTraceString" } new DriverStatusResponseMessage() - .setField(SPARK_VERSION, sparkVersion) + .setField(SERVER_SPARK_VERSION, sparkVersion) .setField(MASTER, masterUrl) .setField(DRIVER_ID, driverId) .setField(SUCCESS, response.found.toString) diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitDriverRequestMessage.scala b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitDriverRequestMessage.scala index 1ce867febcf9a..b4c29d171b730 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitDriverRequestMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitDriverRequestMessage.scala @@ -31,7 +31,7 @@ private[spark] abstract class SubmitDriverRequestField extends SubmitRestProtoco private[spark] object SubmitDriverRequestField extends SubmitRestProtocolFieldCompanion[SubmitDriverRequestField] { case object ACTION extends SubmitDriverRequestField - case object SPARK_VERSION extends SubmitDriverRequestField + case object CLIENT_SPARK_VERSION extends SubmitDriverRequestField case object MESSAGE extends SubmitDriverRequestField case object MASTER extends SubmitDriverRequestField case object APP_NAME extends SubmitDriverRequestField @@ -51,7 +51,7 @@ private[spark] object SubmitDriverRequestField case object APP_ARGS extends SubmitDriverRequestField case object SPARK_PROPERTIES extends SubmitDriverRequestField case object ENVIRONMENT_VARIABLES extends SubmitDriverRequestField - override val requiredFields = Seq(ACTION, SPARK_VERSION, MASTER, APP_NAME, APP_RESOURCE) + override val requiredFields = Seq(ACTION, CLIENT_SPARK_VERSION, MASTER, APP_NAME, APP_RESOURCE) override val optionalFields = Seq(MESSAGE, MAIN_CLASS, JARS, FILES, PY_FILES, DRIVER_MEMORY, DRIVER_CORES, DRIVER_EXTRA_JAVA_OPTIONS, DRIVER_EXTRA_CLASS_PATH, DRIVER_EXTRA_LIBRARY_PATH, SUPERVISE_DRIVER, EXECUTOR_MEMORY, TOTAL_EXECUTOR_CORES, APP_ARGS, SPARK_PROPERTIES, diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitDriverResponseMessage.scala b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitDriverResponseMessage.scala index 4551707660377..7b3524b10f6c1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitDriverResponseMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitDriverResponseMessage.scala @@ -24,12 +24,12 @@ private[spark] abstract class SubmitDriverResponseField extends SubmitRestProtoc private[spark] object SubmitDriverResponseField extends SubmitRestProtocolFieldCompanion[SubmitDriverResponseField] { case object ACTION extends SubmitDriverResponseField - case object SPARK_VERSION extends SubmitDriverResponseField + case object SERVER_SPARK_VERSION extends SubmitDriverResponseField case object MESSAGE extends SubmitDriverResponseField case object MASTER extends SubmitDriverResponseField case object SUCCESS extends SubmitDriverResponseField case object DRIVER_ID extends SubmitDriverResponseField - override val requiredFields = Seq(ACTION, SPARK_VERSION, MESSAGE, MASTER, SUCCESS) + override val requiredFields = Seq(ACTION, SERVER_SPARK_VERSION, MESSAGE, MASTER, SUCCESS) override val optionalFields = Seq(DRIVER_ID) } diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala index 6419520743eb0..2969886f9094e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala @@ -28,12 +28,16 @@ import org.apache.spark.util.Utils /** * A field used in a SubmitRestProtocolMessage. - * Three special fields ACTION, SPARK_VERSION, and MESSAGE are common across all messages. + * There are a few special fields: + * - ACTION entirely specifies the type of the message and is required in all messages + * - MESSAGE contains arbitrary messages and is common, but not required, in all messages + * - CLIENT_SPARK_VERSION is required in all messages sent from the client + * - SERVER_SPARK_VERSION is required in all messages sent from the server */ private[spark] abstract class SubmitRestProtocolField private[spark] object SubmitRestProtocolField { def isActionField(field: String): Boolean = field == "ACTION" - def isSparkVersionField(field: String): Boolean = field == "SPARK_VERSION" + def isSparkVersionField(field: String): Boolean = field.endsWith("_SPARK_VERSION") def isMessageField(field: String): Boolean = field == "MESSAGE" } @@ -129,7 +133,7 @@ private[spark] abstract class SubmitRestProtocolMessage( /** * Return a JObject that represents the JSON form of this message. - * This orders the fields by ACTION (first) < SPARK_VERSION < MESSAGE < * (last) + * This orders the fields by ACTION (first) < SERVER_SPARK_VERSION < MESSAGE < * (last) * and ignores fields with null values. */ protected def toJsonObject: JObject = { diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestServer.scala index c659dfddbf6ab..1cf02c0efd3bf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestServer.scala @@ -130,7 +130,7 @@ private[spark] abstract class SubmitRestServerHandler extends AbstractHandler wi private def handleError(message: String): ErrorMessage = { import ErrorField._ new ErrorMessage() - .setField(SPARK_VERSION, sparkVersion) + .setField(SERVER_SPARK_VERSION, sparkVersion) .setField(MESSAGE, message) } }