From 1f1c03ffdc541cc665b27655c40f10c6e938a1db Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sun, 1 Feb 2015 16:26:10 -0800 Subject: [PATCH] Use Jackson's DefaultScalaModule to simplify messages Instead of explicitly defining getters and setters in the messages, we let Jackson's scala module do the work. This simplifies the code for each message significantly, though at the expense of reducing the level of type safety for users who implement their own clients and servers. --- core/pom.xml | 8 + .../deploy/rest/DriverStatusRequest.scala | 4 +- .../deploy/rest/DriverStatusResponse.scala | 19 +- .../spark/deploy/rest/ErrorResponse.scala | 9 +- .../spark/deploy/rest/KillDriverRequest.scala | 4 +- .../deploy/rest/KillDriverResponse.scala | 4 +- .../deploy/rest/StandaloneRestClient.scala | 71 ++-- .../deploy/rest/StandaloneRestServer.scala | 75 ++-- .../deploy/rest/SubmitDriverRequest.scala | 134 ++----- .../deploy/rest/SubmitDriverResponse.scala | 4 +- .../spark/deploy/rest/SubmitRestClient.scala | 2 +- .../deploy/rest/SubmitRestProtocolField.scala | 30 -- .../rest/SubmitRestProtocolMessage.scala | 117 +++--- .../spark/deploy/rest/SubmitRestServer.scala | 7 +- .../rest/StandaloneRestProtocolSuite.scala | 22 +- .../deploy/rest/SubmitRestProtocolSuite.scala | 334 ++++++++---------- pom.xml | 11 + 17 files changed, 360 insertions(+), 495 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolField.scala diff --git a/core/pom.xml b/core/pom.xml index 31e919a1c831a..d4f2e94b5a143 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -213,6 +213,14 @@ com.codahale.metrics metrics-graphite + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.module + jackson-module-scala_2.10 + org.apache.derby derby diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusRequest.scala b/core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusRequest.scala index e25bb45668e54..0c15816b4be2e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusRequest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusRequest.scala @@ -21,9 +21,7 @@ package org.apache.spark.deploy.rest * A request to query the status of a driver in the REST application submission protocol. */ class DriverStatusRequest extends SubmitRestProtocolRequest { - private val driverId = new SubmitRestProtocolField[String] - def getDriverId: String = driverId.toString - def setDriverId(s: String): this.type = setField(driverId, s) + var driverId: String = null protected override def doValidate(): Unit = { super.doValidate() assertFieldIsSet(driverId, "driverId") diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusResponse.scala b/core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusResponse.scala index 568204ac6c815..97b9d02f1e9f9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusResponse.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusResponse.scala @@ -21,21 +21,12 @@ package org.apache.spark.deploy.rest * A response to the [[DriverStatusRequest]] in the REST application submission protocol. */ class DriverStatusResponse extends SubmitRestProtocolResponse { - private val driverId = new SubmitRestProtocolField[String] - // standalone cluster mode only - private val driverState = new SubmitRestProtocolField[String] - private val workerId = new SubmitRestProtocolField[String] - private val workerHostPort = new SubmitRestProtocolField[String] - - def getDriverId: String = driverId.toString - def getDriverState: String = driverState.toString - def getWorkerId: String = workerId.toString - def getWorkerHostPort: String = workerHostPort.toString + var driverId: String = null - def setDriverId(s: String): this.type = setField(driverId, s) - def setDriverState(s: String): this.type = setField(driverState, s) - def setWorkerId(s: String): this.type = setField(workerId, s) - def setWorkerHostPort(s: String): this.type = setField(workerHostPort, s) + // standalone cluster mode only + var driverState: String = null + var workerId: String = null + var workerHostPort: String = null protected override def doValidate(): Unit = { super.doValidate() diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/ErrorResponse.scala b/core/src/main/scala/org/apache/spark/deploy/rest/ErrorResponse.scala index 0bc003f97ab5b..6bb674dc88a38 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/ErrorResponse.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/ErrorResponse.scala @@ -17,20 +17,17 @@ package org.apache.spark.deploy.rest -import com.fasterxml.jackson.annotation.JsonIgnore - /** * An error response message used in the REST application submission protocol. */ class ErrorResponse extends SubmitRestProtocolResponse { - setSuccess("false") - // Don't bother logging success = false in the JSON - @JsonIgnore - override def getSuccess: String = super.getSuccess + // request was unsuccessful + success = "false" protected override def doValidate(): Unit = { super.doValidate() assertFieldIsSet(message, "message") + assert(!success.toBoolean, s"The 'success' field must be false in $messageType.") } } diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/KillDriverRequest.scala b/core/src/main/scala/org/apache/spark/deploy/rest/KillDriverRequest.scala index 99f52a0839549..7660864fbbf64 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/KillDriverRequest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/KillDriverRequest.scala @@ -21,9 +21,7 @@ package org.apache.spark.deploy.rest * A request to kill a driver in the REST application submission protocol. */ class KillDriverRequest extends SubmitRestProtocolRequest { - private val driverId = new SubmitRestProtocolField[String] - def getDriverId: String = driverId.toString - def setDriverId(s: String): this.type = setField(driverId, s) + var driverId: String = null protected override def doValidate(): Unit = { super.doValidate() assertFieldIsSet(driverId, "driverId") diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/KillDriverResponse.scala b/core/src/main/scala/org/apache/spark/deploy/rest/KillDriverResponse.scala index cb0112653b2fc..1366d6ba77c91 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/KillDriverResponse.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/KillDriverResponse.scala @@ -21,9 +21,7 @@ package org.apache.spark.deploy.rest * A response to the [[KillDriverRequest]] in the REST application submission protocol. */ class KillDriverResponse extends SubmitRestProtocolResponse { - private val driverId = new SubmitRestProtocolField[String] - def getDriverId: String = driverId.toString - def setDriverId(s: String): this.type = setField(driverId, s) + var driverId: String = null protected override def doValidate(): Unit = { super.doValidate() assertFieldIsSet(driverId, "driverId") diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala index eb671978e2aff..df7319235c653 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala @@ -43,14 +43,19 @@ private[spark] class StandaloneRestClient extends SubmitRestClient { case s: SubmitDriverResponse => s case _ => return response } - val submitSuccess = submitResponse.getSuccess.toBoolean + // Report status of submitted driver to user + val submitSuccess = submitResponse.success.toBoolean if (submitSuccess) { - val driverId = submitResponse.getDriverId - logInfo(s"Driver successfully submitted as $driverId. Polling driver state...") - pollSubmittedDriverStatus(args.master, driverId) + val driverId = submitResponse.driverId + if (driverId != null) { + logInfo(s"Driver successfully submitted as $driverId. Polling driver state...") + pollSubmittedDriverStatus(args.master, driverId) + } else { + logError("Application successfully submitted, but driver ID was not provided!") + } } else { - val submitMessage = submitResponse.getMessage - logError(s"Application submission failed: $submitMessage") + val failMessage = Option(submitResponse.message).map { ": " + _ }.getOrElse("") + logError("Application submission failed" + failMessage) } submitResponse } @@ -78,12 +83,12 @@ private[spark] class StandaloneRestClient extends SubmitRestClient { case s: DriverStatusResponse => s case _ => return } - val statusSuccess = statusResponse.getSuccess.toBoolean + val statusSuccess = statusResponse.success.toBoolean if (statusSuccess) { - val driverState = Option(statusResponse.getDriverState) - val workerId = Option(statusResponse.getWorkerId) - val workerHostPort = Option(statusResponse.getWorkerHostPort) - val exception = Option(statusResponse.getMessage) + val driverState = Option(statusResponse.driverState) + val workerId = Option(statusResponse.workerId) + val workerHostPort = Option(statusResponse.workerHostPort) + val exception = Option(statusResponse.message) // Log driver state, if present driverState match { case Some(state) => logInfo(s"State of driver $driverId is now $state.") @@ -105,21 +110,21 @@ private[spark] class StandaloneRestClient extends SubmitRestClient { /** Construct a submit driver request message. */ protected override def constructSubmitRequest(args: SparkSubmitArguments): SubmitDriverRequest = { - val message = new SubmitDriverRequest() - .setSparkVersion(sparkVersion) - .setAppName(args.name) - .setAppResource(args.primaryResource) - .setMainClass(args.mainClass) - .setJars(args.jars) - .setFiles(args.files) - .setDriverMemory(args.driverMemory) - .setDriverCores(args.driverCores) - .setDriverExtraJavaOptions(args.driverExtraJavaOptions) - .setDriverExtraClassPath(args.driverExtraClassPath) - .setDriverExtraLibraryPath(args.driverExtraLibraryPath) - .setSuperviseDriver(args.supervise.toString) - .setExecutorMemory(args.executorMemory) - .setTotalExecutorCores(args.totalExecutorCores) + val message = new SubmitDriverRequest + message.clientSparkVersion = sparkVersion + message.appName = args.name + message.appResource = args.primaryResource + message.mainClass = args.mainClass + message.jars = args.jars + message.files = args.files + message.driverMemory = args.driverMemory + message.driverCores = args.driverCores + message.driverExtraJavaOptions = args.driverExtraJavaOptions + message.driverExtraClassPath = args.driverExtraClassPath + message.driverExtraLibraryPath = args.driverExtraLibraryPath + message.superviseDriver = args.supervise.toString + message.executorMemory = args.executorMemory + message.totalExecutorCores = args.totalExecutorCores args.childArgs.foreach(message.addAppArg) args.sparkProperties.foreach { case (k, v) => message.setSparkProperty(k, v) } sys.env.foreach { case (k, v) => @@ -132,18 +137,20 @@ private[spark] class StandaloneRestClient extends SubmitRestClient { protected override def constructKillRequest( master: String, driverId: String): KillDriverRequest = { - new KillDriverRequest() - .setSparkVersion(sparkVersion) - .setDriverId(driverId) + val k = new KillDriverRequest + k.clientSparkVersion = sparkVersion + k.driverId = driverId + k } /** Construct a driver status request message. */ protected override def constructStatusRequest( master: String, driverId: String): DriverStatusRequest = { - new DriverStatusRequest() - .setSparkVersion(sparkVersion) - .setDriverId(driverId) + val d = new DriverStatusRequest + d.clientSparkVersion = sparkVersion + d.driverId = driverId + d } /** Extract the URL portion of the master address. */ diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index 802933bf3f2ae..de0f701fd3258 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -58,39 +58,42 @@ private[spark] class StandaloneRestServerHandler( val driverDescription = buildDriverDescription(request) val response = AkkaUtils.askWithReply[DeployMessages.SubmitDriverResponse]( DeployMessages.RequestSubmitDriver(driverDescription), masterActor, askTimeout) - new SubmitDriverResponse() - .setSparkVersion(sparkVersion) - .setMessage(response.message) - .setSuccess(response.success.toString) - .setDriverId(response.driverId.orNull) + val s = new SubmitDriverResponse + s.serverSparkVersion = sparkVersion + s.message = response.message + s.success = response.success.toString + s.driverId = response.driverId.orNull + s } /** Handle a request to kill a driver. */ protected override def handleKill(request: KillDriverRequest): KillDriverResponse = { - val driverId = request.getDriverId + val driverId = request.driverId val response = AkkaUtils.askWithReply[DeployMessages.KillDriverResponse]( DeployMessages.RequestKillDriver(driverId), masterActor, askTimeout) - new KillDriverResponse() - .setSparkVersion(sparkVersion) - .setMessage(response.message) - .setDriverId(driverId) - .setSuccess(response.success.toString) + val k = new KillDriverResponse + k.serverSparkVersion = sparkVersion + k.message = response.message + k.driverId = driverId + k.success = response.success.toString + k } /** Handle a request for a driver's status. */ protected override def handleStatus(request: DriverStatusRequest): DriverStatusResponse = { - val driverId = request.getDriverId + val driverId = request.driverId val response = AkkaUtils.askWithReply[DeployMessages.DriverStatusResponse]( DeployMessages.RequestDriverStatus(driverId), masterActor, askTimeout) val message = response.exception.map { s"Exception from the cluster:\n" + formatException(_) } - new DriverStatusResponse() - .setSparkVersion(sparkVersion) - .setDriverId(driverId) - .setSuccess(response.found.toString) - .setDriverState(response.state.map(_.toString).orNull) - .setWorkerId(response.workerId.orNull) - .setWorkerHostPort(response.workerHostPort.orNull) - .setMessage(message.orNull) + val d = new DriverStatusResponse + d.serverSparkVersion = sparkVersion + d.driverId = driverId + d.success = response.found.toString + d.driverState = response.state.map(_.toString).orNull + d.workerId = response.workerId.orNull + d.workerHostPort = response.workerHostPort.orNull + d.message = message.orNull + d } /** @@ -101,27 +104,27 @@ private[spark] class StandaloneRestServerHandler( */ private def buildDriverDescription(request: SubmitDriverRequest): DriverDescription = { // Required fields, including the main class because python is not yet supported - val appName = request.getAppName - val appResource = request.getAppResource - val mainClass = request.getMainClass + val appName = request.appName + val appResource = request.appResource + val mainClass = request.mainClass if (mainClass == null) { throw new SubmitRestMissingFieldException("Main class must be set in submit request.") } // Optional fields - val jars = Option(request.getJars) - val files = Option(request.getFiles) - val driverMemory = Option(request.getDriverMemory) - val driverCores = Option(request.getDriverCores) - val driverExtraJavaOptions = Option(request.getDriverExtraJavaOptions) - val driverExtraClassPath = Option(request.getDriverExtraClassPath) - val driverExtraLibraryPath = Option(request.getDriverExtraLibraryPath) - val superviseDriver = Option(request.getSuperviseDriver) - val executorMemory = Option(request.getExecutorMemory) - val totalExecutorCores = Option(request.getTotalExecutorCores) - val appArgs = request.getAppArgs - val sparkProperties = request.getSparkProperties - val environmentVariables = request.getEnvironmentVariables + val jars = Option(request.jars) + val files = Option(request.files) + val driverMemory = Option(request.driverMemory) + val driverCores = Option(request.driverCores) + val driverExtraJavaOptions = Option(request.driverExtraJavaOptions) + val driverExtraClassPath = Option(request.driverExtraClassPath) + val driverExtraLibraryPath = Option(request.driverExtraLibraryPath) + val superviseDriver = Option(request.superviseDriver) + val executorMemory = Option(request.executorMemory) + val totalExecutorCores = Option(request.totalExecutorCores) + val appArgs = request.appArgs + val sparkProperties = request.sparkProperties + val environmentVariables = request.environmentVariables // Translate all fields to the relevant Spark properties val conf = new SparkConf(false) diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitDriverRequest.scala b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitDriverRequest.scala index c5daa616860e2..d6bc050285b96 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitDriverRequest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitDriverRequest.scala @@ -20,127 +20,51 @@ package org.apache.spark.deploy.rest import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import com.fasterxml.jackson.annotation.{JsonIgnore, JsonProperty} -import org.json4s.jackson.JsonMethods._ - -import org.apache.spark.util.JsonProtocol +import com.fasterxml.jackson.annotation.{JsonProperty, JsonIgnore, JsonInclude} /** * A request to submit a driver in the REST application submission protocol. */ class SubmitDriverRequest extends SubmitRestProtocolRequest { - private val appName = new SubmitRestProtocolField[String] - private val appResource = new SubmitRestProtocolField[String] - private val mainClass = new SubmitRestProtocolField[String] - private val jars = new SubmitRestProtocolField[String] - private val files = new SubmitRestProtocolField[String] - private val pyFiles = new SubmitRestProtocolField[String] - private val driverMemory = new SubmitRestProtocolField[String] - private val driverCores = new SubmitRestProtocolField[Int] - private val driverExtraJavaOptions = new SubmitRestProtocolField[String] - private val driverExtraClassPath = new SubmitRestProtocolField[String] - private val driverExtraLibraryPath = new SubmitRestProtocolField[String] - private val superviseDriver = new SubmitRestProtocolField[Boolean] - private val executorMemory = new SubmitRestProtocolField[String] - private val totalExecutorCores = new SubmitRestProtocolField[Int] + var appName: String = null + var appResource: String = null + var mainClass: String = null + var jars: String = null + var files: String = null + var pyFiles: String = null + var driverMemory: String = null + var driverCores: String = null + var driverExtraJavaOptions: String = null + var driverExtraClassPath: String = null + var driverExtraLibraryPath: String = null + var superviseDriver: String = null + var executorMemory: String = null + var totalExecutorCores: String = null // Special fields - private val appArgs = new ArrayBuffer[String] - private val sparkProperties = new mutable.HashMap[String, String] - private val envVars = new mutable.HashMap[String, String] - - def getAppName: String = appName.toString - def getAppResource: String = appResource.toString - def getMainClass: String = mainClass.toString - def getJars: String = jars.toString - def getFiles: String = files.toString - def getPyFiles: String = pyFiles.toString - def getDriverMemory: String = driverMemory.toString - def getDriverCores: String = driverCores.toString - def getDriverExtraJavaOptions: String = driverExtraJavaOptions.toString - def getDriverExtraClassPath: String = driverExtraClassPath.toString - def getDriverExtraLibraryPath: String = driverExtraLibraryPath.toString - def getSuperviseDriver: String = superviseDriver.toString - def getExecutorMemory: String = executorMemory.toString - def getTotalExecutorCores: String = totalExecutorCores.toString - - // Special getters required for JSON serialization @JsonProperty("appArgs") - private def getAppArgsJson: String = arrayToJson(getAppArgs) + private val _appArgs = new ArrayBuffer[String] @JsonProperty("sparkProperties") - private def getSparkPropertiesJson: String = mapToJson(getSparkProperties) + private val _sparkProperties = new mutable.HashMap[String, String] @JsonProperty("environmentVariables") - private def getEnvironmentVariablesJson: String = mapToJson(getEnvironmentVariables) + private val _envVars = new mutable.HashMap[String, String] - def setAppName(s: String): this.type = setField(appName, s) - def setAppResource(s: String): this.type = setField(appResource, s) - def setMainClass(s: String): this.type = setField(mainClass, s) - def setJars(s: String): this.type = setField(jars, s) - def setFiles(s: String): this.type = setField(files, s) - def setPyFiles(s: String): this.type = setField(pyFiles, s) - def setDriverMemory(s: String): this.type = setField(driverMemory, s) - def setDriverCores(s: String): this.type = setNumericField(driverCores, s) - def setDriverExtraJavaOptions(s: String): this.type = setField(driverExtraJavaOptions, s) - def setDriverExtraClassPath(s: String): this.type = setField(driverExtraClassPath, s) - def setDriverExtraLibraryPath(s: String): this.type = setField(driverExtraLibraryPath, s) - def setSuperviseDriver(s: String): this.type = setBooleanField(superviseDriver, s) - def setExecutorMemory(s: String): this.type = setField(executorMemory, s) - def setTotalExecutorCores(s: String): this.type = setNumericField(totalExecutorCores, s) - - // Special setters required for JSON deserialization - @JsonProperty("appArgs") - private def setAppArgsJson(s: String): Unit = { - appArgs.clear() - appArgs ++= JsonProtocol.arrayFromJson(parse(s)) - } - @JsonProperty("sparkProperties") - private def setSparkPropertiesJson(s: String): Unit = { - sparkProperties.clear() - sparkProperties ++= JsonProtocol.mapFromJson(parse(s)) - } - @JsonProperty("environmentVariables") - private def setEnvironmentVariablesJson(s: String): Unit = { - envVars.clear() - envVars ++= JsonProtocol.mapFromJson(parse(s)) - } + def appArgs: Array[String] = _appArgs.toArray + def sparkProperties: Map[String, String] = _sparkProperties.toMap + def environmentVariables: Map[String, String] = _envVars.toMap - /** Return an array of arguments to be passed to the application. */ - @JsonIgnore - def getAppArgs: Array[String] = appArgs.toArray - - /** Return a map of Spark properties to be passed to the application as java options. */ - @JsonIgnore - def getSparkProperties: Map[String, String] = sparkProperties.toMap - - /** Return a map of environment variables to be passed to the application. */ - @JsonIgnore - def getEnvironmentVariables: Map[String, String] = envVars.toMap - - /** Add a command line argument to be passed to the application. */ - @JsonIgnore - def addAppArg(s: String): this.type = { appArgs += s; this } - - /** Set a Spark property to be passed to the application as a java option. */ - @JsonIgnore - def setSparkProperty(k: String, v: String): this.type = { sparkProperties(k) = v; this } - - /** Set an environment variable to be passed to the application. */ - @JsonIgnore - def setEnvironmentVariable(k: String, v: String): this.type = { envVars(k) = v; this } - - /** Serialize the given Array to a compact JSON string. */ - private def arrayToJson(arr: Array[String]): String = { - if (arr.nonEmpty) { compact(render(JsonProtocol.arrayToJson(arr))) } else null - } - - /** Serialize the given Map to a compact JSON string. */ - private def mapToJson(map: Map[String, String]): String = { - if (map.nonEmpty) { compact(render(JsonProtocol.mapToJson(map))) } else null - } + def addAppArg(s: String): this.type = { _appArgs += s; this } + def setSparkProperty(k: String, v: String): this.type = { _sparkProperties(k) = v; this } + def setEnvironmentVariable(k: String, v: String): this.type = { _envVars(k) = v; this } protected override def doValidate(): Unit = { super.doValidate() assertFieldIsSet(appName, "appName") assertFieldIsSet(appResource, "appResource") + assertFieldIsMemory(driverMemory, "driverMemory") + assertFieldIsNumeric(driverCores, "driverCores") + assertFieldIsBoolean(superviseDriver, "superviseDriver") + assertFieldIsMemory(executorMemory, "executorMemory") + assertFieldIsNumeric(totalExecutorCores, "totalExecutorCores") } } diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitDriverResponse.scala b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitDriverResponse.scala index 1ac769f3110a0..d2b60aac2f74c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitDriverResponse.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitDriverResponse.scala @@ -21,7 +21,5 @@ package org.apache.spark.deploy.rest * A response to the [[SubmitDriverRequest]] in the REST application submission protocol. */ class SubmitDriverResponse extends SubmitRestProtocolResponse { - private val driverId = new SubmitRestProtocolField[String] - def getDriverId: String = driverId.toString - def setDriverId(s: String): this.type = setField(driverId, s) + var driverId: String = null } diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestClient.scala index ea056351bcf80..9af29a41e2288 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestClient.scala @@ -100,7 +100,7 @@ private[spark] abstract class SubmitRestClient extends Logging { try { response.validate() response match { - case e: ErrorResponse => logError(s"Server responded with error:\n${e.getMessage}") + case e: ErrorResponse => logError(s"Server responded with error:\n${e.message}") case _ => } } catch { diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolField.scala b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolField.scala deleted file mode 100644 index 3e7208c4d8c35..0000000000000 --- a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolField.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.rest - -/** - * A field used in [[SubmitRestProtocolMessage]]s. - */ -class SubmitRestProtocolField[T] { - protected var value: Option[T] = None - def isSet: Boolean = value.isDefined - def getValue: Option[T] = value - def setValue(v: T): Unit = { value = Some(v) } - def clearValue(): Unit = { value = None } - override def toString: String = value.map(_.toString).orNull -} diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala index d2f12a5e863d8..82d7f86ecb903 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolMessage.scala @@ -17,10 +17,13 @@ package org.apache.spark.deploy.rest +import scala.util.Try + import com.fasterxml.jackson.annotation._ import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility import com.fasterxml.jackson.annotation.JsonInclude.Include import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature} +import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.json4s.JsonAST._ import org.json4s.jackson.JsonMethods._ @@ -39,23 +42,14 @@ import org.apache.spark.util.Utils @JsonAutoDetect(getterVisibility = Visibility.ANY, setterVisibility = Visibility.ANY) @JsonPropertyOrder(alphabetic = true) abstract class SubmitRestProtocolMessage { + @JsonIgnore val messageType = Utils.getFormattedClassName(this) - protected val action: String = messageType - protected val sparkVersion: SubmitRestProtocolField[String] - protected val message = new SubmitRestProtocolField[String] - // Required for JSON de/serialization and not explicitly used - private def getAction: String = action - private def setAction(s: String): this.type = this + val action: String = messageType + var message: String = null - // Intended for the user and not for JSON de/serialization, which expects more specific keys - @JsonIgnore - def getSparkVersion: String - @JsonIgnore - def setSparkVersion(s: String): this.type - - def getMessage: String = message.toString - def setMessage(s: String): this.type = setField(message, s) + // For JSON deserialization + private def setAction(a: String): Unit = { } /** * Serialize the message to JSON. @@ -63,9 +57,7 @@ abstract class SubmitRestProtocolMessage { */ def toJson: String = { validate() - val mapper = new ObjectMapper - mapper.enable(SerializationFeature.INDENT_OUTPUT) - mapper.writeValueAsString(this) + SubmitRestProtocolMessage.mapper.writeValueAsString(this) } /** @@ -89,53 +81,52 @@ abstract class SubmitRestProtocolMessage { } /** Assert that the specified field is set in this message. */ - protected def assertFieldIsSet(field: SubmitRestProtocolField[_], name: String): Unit = { - if (!field.isSet) { + protected def assertFieldIsSet(value: String, name: String): Unit = { + if (value == null) { throw new SubmitRestMissingFieldException( s"Field '$name' is missing in message $messageType.") } } - /** - * Assert a condition when validating this message. - * If the assertion fails, throw a [[SubmitRestProtocolException]]. - */ - protected def assert(condition: Boolean, failMessage: String): Unit = { - if (!condition) { throw new SubmitRestProtocolException(failMessage) } - } - - /** Set the field to the given value, or clear the field if the value is null. */ - protected def setField(f: SubmitRestProtocolField[String], v: String): this.type = { - if (v == null) { f.clearValue() } else { f.setValue(v) } - this + /** Assert that the value of the specified field is a boolean. */ + protected def assertFieldIsBoolean(value: String, name: String): Unit = { + if (value != null) { + Try(value.toBoolean).getOrElse { + throw new SubmitRestProtocolException( + s"Field '$name' expected boolean value: actual was '$value'.") + } + } } - /** - * Set the field to the given boolean value, or clear the field if the value is null. - * If the provided value does not represent a boolean, throw an exception. - */ - protected def setBooleanField(f: SubmitRestProtocolField[Boolean], v: String): this.type = { - if (v == null) { f.clearValue() } else { f.setValue(v.toBoolean) } - this + /** Assert that the value of the specified field is a numeric. */ + protected def assertFieldIsNumeric(value: String, name: String): Unit = { + if (value != null) { + Try(value.toInt).getOrElse { + throw new SubmitRestProtocolException( + s"Field '$name' expected numeric value: actual was '$value'.") + } + } } /** - * Set the field to the given numeric value, or clear the field if the value is null. - * If the provided value does not represent a numeric, throw an exception. + * Assert that the value of the specified field is a memory string. + * Examples of valid memory strings include 3g, 512m, 128k, 4096. */ - protected def setNumericField(f: SubmitRestProtocolField[Int], v: String): this.type = { - if (v == null) { f.clearValue() } else { f.setValue(v.toInt) } - this + protected def assertFieldIsMemory(value: String, name: String): Unit = { + if (value != null) { + Try(Utils.memoryStringToMb(value)).getOrElse { + throw new SubmitRestProtocolException( + s"Field '$name' expected memory value: actual was '$value'.") + } + } } /** - * Set the field to the given memory value, or clear the field if the value is null. - * If the provided value does not represent a memory value, throw an exception. - * Valid examples of memory values include "512m", "24g", and "128000". + * Assert a condition when validating this message. + * If the assertion fails, throw a [[SubmitRestProtocolException]]. */ - protected def setMemoryField(f: SubmitRestProtocolField[String], v: String): this.type = { - Utils.memoryStringToMb(v) - setField(f, v) + protected def assert(condition: Boolean, failMessage: String): Unit = { + if (!condition) { throw new SubmitRestProtocolException(failMessage) } } } @@ -143,14 +134,10 @@ abstract class SubmitRestProtocolMessage { * An abstract request sent from the client in the REST application submission protocol. */ abstract class SubmitRestProtocolRequest extends SubmitRestProtocolMessage { - protected override val sparkVersion = new SubmitRestProtocolField[String] - def getClientSparkVersion: String = sparkVersion.toString - def setClientSparkVersion(s: String): this.type = setField(sparkVersion, s) - override def getSparkVersion: String = getClientSparkVersion - override def setSparkVersion(s: String) = setClientSparkVersion(s) + var clientSparkVersion: String = null protected override def doValidate(): Unit = { super.doValidate() - assertFieldIsSet(sparkVersion, "clientSparkVersion") + assertFieldIsSet(clientSparkVersion, "clientSparkVersion") } } @@ -158,27 +145,21 @@ abstract class SubmitRestProtocolRequest extends SubmitRestProtocolMessage { * An abstract response sent from the server in the REST application submission protocol. */ abstract class SubmitRestProtocolResponse extends SubmitRestProtocolMessage { - protected override val sparkVersion = new SubmitRestProtocolField[String] - private val success = new SubmitRestProtocolField[Boolean] - - override def getSparkVersion: String = getServerSparkVersion - def getServerSparkVersion: String = sparkVersion.toString - def getSuccess: String = success.toString - - override def setSparkVersion(s: String) = setServerSparkVersion(s) - def setServerSparkVersion(s: String): this.type = setField(sparkVersion, s) - def setSuccess(s: String): this.type = setBooleanField(success, s) - + var serverSparkVersion: String = null + var success: String = null protected override def doValidate(): Unit = { super.doValidate() - assertFieldIsSet(sparkVersion, "serverSparkVersion") + assertFieldIsSet(serverSparkVersion, "serverSparkVersion") assertFieldIsSet(success, "success") + assertFieldIsBoolean(success, "success") } } object SubmitRestProtocolMessage { - private val mapper = new ObjectMapper private val packagePrefix = this.getClass.getPackage.getName + private val mapper = new ObjectMapper() + .registerModule(DefaultScalaModule) + .enable(SerializationFeature.INDENT_OUTPUT) /** * Parse the value of the action field from the given JSON. diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestServer.scala index 027f641aa8f9f..6c2a3a159da8e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestServer.scala @@ -137,9 +137,10 @@ private[spark] abstract class SubmitRestServerHandler extends AbstractHandler wi /** Construct an error message to signal the fact that an exception has been thrown. */ private def handleError(message: String): ErrorResponse = { - new ErrorResponse() - .setSparkVersion(sparkVersion) - .setMessage(message) + val e = new ErrorResponse + e.serverSparkVersion = sparkVersion + e.message = message + e } /** Return a human readable String representation of the exception. */ diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestProtocolSuite.scala index 2a147c77c9cd2..fa994118883f3 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestProtocolSuite.scala @@ -61,7 +61,7 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B test("kill empty driver") { val response = client.killDriver(masterRestUrl, "driver-that-does-not-exist") val killResponse = getKillResponse(response) - val killSuccess = killResponse.getSuccess + val killSuccess = killResponse.success assert(killSuccess === "false") } @@ -72,12 +72,12 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B val driverId = submitApplication(resultsFile, numbers, size) val response = client.killDriver(masterRestUrl, driverId) val killResponse = getKillResponse(response) - val killSuccess = killResponse.getSuccess + val killSuccess = killResponse.success waitUntilFinished(driverId) val response2 = client.requestDriverStatus(masterRestUrl, driverId) val statusResponse = getStatusResponse(response2) - val statusSuccess = statusResponse.getSuccess - val driverState = statusResponse.getDriverState + val statusSuccess = statusResponse.success + val driverState = statusResponse.driverState assert(killSuccess === "true") assert(statusSuccess === "true") assert(driverState === DriverState.KILLED.toString) @@ -88,7 +88,7 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B test("request status for empty driver") { val response = client.requestDriverStatus(masterRestUrl, "driver-that-does-not-exist") val statusResponse = getStatusResponse(response) - val statusSuccess = statusResponse.getSuccess + val statusSuccess = statusResponse.success assert(statusSuccess === "false") } @@ -131,7 +131,9 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B SparkSubmit.prepareSubmitEnvironment(args) val response = client.submitDriver(args) val submitResponse = getSubmitResponse(response) - submitResponse.getDriverId + val driverId = submitResponse.driverId + assert(driverId != null, "Application submission was unsuccessful!") + driverId } /** Wait until the given driver has finished running up to the specified timeout. */ @@ -141,7 +143,7 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B while (!finished) { val response = client.requestDriverStatus(masterRestUrl, driverId) val statusResponse = getStatusResponse(response) - val driverState = statusResponse.getDriverState + val driverState = statusResponse.driverState finished = driverState != DriverState.SUBMITTED.toString && driverState != DriverState.RUNNING.toString @@ -155,7 +157,7 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B private def getSubmitResponse(response: SubmitRestProtocolResponse): SubmitDriverResponse = { response match { case s: SubmitDriverResponse => s - case e: ErrorResponse => fail(s"Server returned error: ${e.toJson}") + case e: ErrorResponse => fail(s"Server returned error: ${e.message}") case r => fail(s"Expected submit response. Actual: ${r.toJson}") } } @@ -164,7 +166,7 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B private def getKillResponse(response: SubmitRestProtocolResponse): KillDriverResponse = { response match { case k: KillDriverResponse => k - case e: ErrorResponse => fail(s"Server returned error: ${e.toJson}") + case e: ErrorResponse => fail(s"Server returned error: ${e.message}") case r => fail(s"Expected kill response. Actual: ${r.toJson}") } } @@ -173,7 +175,7 @@ class StandaloneRestProtocolSuite extends FunSuite with BeforeAndAfterAll with B private def getStatusResponse(response: SubmitRestProtocolResponse): DriverStatusResponse = { response match { case s: DriverStatusResponse => s - case e: ErrorResponse => fail(s"Server returned error: ${e.toJson}") + case e: ErrorResponse => fail(s"Server returned error: ${e.message}") case r => fail(s"Expected status response. Actual: ${r.toJson}") } } diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala index f00f7848befc7..97158a2cecda8 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala @@ -25,124 +25,106 @@ import org.scalatest.FunSuite */ class SubmitRestProtocolSuite extends FunSuite { - test("get and set fields") { - val request = new DummyRequest - assert(request.getSparkVersion === null) - assert(request.getMessage === null) - assert(request.getActive === null) - assert(request.getAge === null) - assert(request.getName === null) - request.setSparkVersion("1.2.3") - request.setActive("true") - request.setAge("10") - request.setName("dolphin") - assert(request.getSparkVersion === "1.2.3") - assert(request.getMessage === null) - assert(request.getActive === "true") - assert(request.getAge === "10") - assert(request.getName === "dolphin") - // overwrite - request.setName("shark") - request.setActive("false") - assert(request.getName === "shark") - assert(request.getActive === "false") - } - - test("get and set fields with null values") { - val request = new DummyRequest - request.setSparkVersion(null) - request.setActive(null) - request.setAge(null) - request.setName(null) - request.setMessage(null) - assert(request.getSparkVersion === null) - assert(request.getMessage === null) - assert(request.getActive === null) - assert(request.getAge === null) - assert(request.getName === null) - } - - test("set fields with illegal argument") { - val request = new DummyRequest - intercept[IllegalArgumentException] { request.setActive("not-a-boolean") } - intercept[IllegalArgumentException] { request.setActive("150") } - intercept[IllegalArgumentException] { request.setAge("not-a-number") } - intercept[IllegalArgumentException] { request.setAge("true") } - } - test("validate") { val request = new DummyRequest intercept[SubmitRestProtocolException] { request.validate() } // missing everything - request.setSparkVersion("1.4.8") + request.clientSparkVersion = "1.2.3" intercept[SubmitRestProtocolException] { request.validate() } // missing name and age - request.setName("something") + request.name = "something" intercept[SubmitRestProtocolException] { request.validate() } // missing only age - request.setAge("2") + request.age = "2" intercept[SubmitRestProtocolException] { request.validate() } // age too low - request.setAge("10") - request.validate() // everything is set - request.setSparkVersion(null) + request.age = "10" + request.validate() // everything is set properly + request.clientSparkVersion = null intercept[SubmitRestProtocolException] { request.validate() } // missing only Spark version - request.setSparkVersion("1.2.3") - request.setName(null) + request.clientSparkVersion = "1.2.3" + request.name = null intercept[SubmitRestProtocolException] { request.validate() } // missing only name - request.setMessage("not-setting-name") + request.message = "not-setting-name" intercept[SubmitRestProtocolException] { request.validate() } // still missing name } + test("validate with illegal argument") { + val request = new DummyRequest + request.clientSparkVersion = "1.2.3" + request.name = "abc" + request.age = "not-a-number" + intercept[SubmitRestProtocolException] { request.validate() } + request.age = "true" + intercept[SubmitRestProtocolException] { request.validate() } + request.age = "150" + request.validate() + request.active = "not-a-boolean" + intercept[SubmitRestProtocolException] { request.validate() } + request.active = "150" + intercept[SubmitRestProtocolException] { request.validate() } + request.active = "true" + request.validate() + } + test("request to and from JSON") { - val request = new DummyRequest() - .setSparkVersion("1.2.3") - .setActive("true") - .setAge("25") - .setName("jung") + val request = new DummyRequest + intercept[SubmitRestProtocolException] { request.toJson } // implicit validation + request.clientSparkVersion = "1.2.3" + request.active = "true" + request.age = "25" + request.name = "jung" val json = request.toJson assertJsonEquals(json, dummyRequestJson) val newRequest = SubmitRestProtocolMessage.fromJson(json, classOf[DummyRequest]) - assert(newRequest.getSparkVersion === "1.2.3") - assert(newRequest.getClientSparkVersion === "1.2.3") - assert(newRequest.getActive === "true") - assert(newRequest.getAge === "25") - assert(newRequest.getName === "jung") - assert(newRequest.getMessage === null) + assert(newRequest.clientSparkVersion === "1.2.3") + assert(newRequest.clientSparkVersion === "1.2.3") + assert(newRequest.active === "true") + assert(newRequest.age === "25") + assert(newRequest.name === "jung") + assert(newRequest.message === null) } test("response to and from JSON") { - val response = new DummyResponse() - .setSparkVersion("3.3.4") - .setSuccess("true") + val response = new DummyResponse + response.serverSparkVersion = "3.3.4" + response.success = "true" val json = response.toJson assertJsonEquals(json, dummyResponseJson) val newResponse = SubmitRestProtocolMessage.fromJson(json, classOf[DummyResponse]) - assert(newResponse.getSparkVersion === "3.3.4") - assert(newResponse.getServerSparkVersion === "3.3.4") - assert(newResponse.getSuccess === "true") - assert(newResponse.getMessage === null) + assert(newResponse.serverSparkVersion === "3.3.4") + assert(newResponse.serverSparkVersion === "3.3.4") + assert(newResponse.success === "true") + assert(newResponse.message === null) } test("SubmitDriverRequest") { val message = new SubmitDriverRequest intercept[SubmitRestProtocolException] { message.validate() } - intercept[IllegalArgumentException] { message.setDriverCores("one hundred feet") } - intercept[IllegalArgumentException] { message.setSuperviseDriver("nope, never") } - intercept[IllegalArgumentException] { message.setTotalExecutorCores("two men") } - message.setSparkVersion("1.2.3") - message.setAppName("SparkPie") - message.setAppResource("honey-walnut-cherry.jar") + message.clientSparkVersion = "1.2.3" + message.appName = "SparkPie" + message.appResource = "honey-walnut-cherry.jar" message.validate() // optional fields - message.setMainClass("org.apache.spark.examples.SparkPie") - message.setJars("mayonnaise.jar,ketchup.jar") - message.setFiles("fireball.png") - message.setPyFiles("do-not-eat-my.py") - message.setDriverMemory("512m") - message.setDriverCores("180") - message.setDriverExtraJavaOptions(" -Dslices=5 -Dcolor=mostly_red") - message.setDriverExtraClassPath("food-coloring.jar") - message.setDriverExtraLibraryPath("pickle.jar") - message.setSuperviseDriver("false") - message.setExecutorMemory("256m") - message.setTotalExecutorCores("10000") + message.mainClass = "org.apache.spark.examples.SparkPie" + message.jars = "mayonnaise.jar,ketchup.jar" + message.files = "fireball.png" + message.pyFiles = "do-not-eat-my.py" + message.driverMemory = "512m" + message.driverCores = "180" + message.driverExtraJavaOptions = " -Dslices=5 -Dcolor=mostly_red" + message.driverExtraClassPath = "food-coloring.jar" + message.driverExtraLibraryPath = "pickle.jar" + message.superviseDriver = "false" + message.executorMemory = "256m" + message.totalExecutorCores = "10000" + message.validate() + // bad fields + message.driverCores = "one hundred feet" + intercept[SubmitRestProtocolException] { message.validate() } + message.driverCores = "180" + message.superviseDriver = "nope, never" + intercept[SubmitRestProtocolException] { message.validate() } + message.superviseDriver = "false" + message.totalExecutorCores = "two men" + intercept[SubmitRestProtocolException] { message.validate() } + message.totalExecutorCores = "10000" // special fields message.addAppArg("two slices") message.addAppArg("a hint of cinnamon") @@ -150,142 +132,144 @@ class SubmitRestProtocolSuite extends FunSuite { message.setSparkProperty("spark.shuffle.enabled", "false") message.setEnvironmentVariable("PATH", "/dev/null") message.setEnvironmentVariable("PYTHONPATH", "/dev/null") - assert(message.getAppArgs === Seq("two slices", "a hint of cinnamon")) - assert(message.getSparkProperties.size === 2) - assert(message.getSparkProperties("spark.live.long") === "true") - assert(message.getSparkProperties("spark.shuffle.enabled") === "false") - assert(message.getEnvironmentVariables.size === 2) - assert(message.getEnvironmentVariables("PATH") === "/dev/null") - assert(message.getEnvironmentVariables("PYTHONPATH") === "/dev/null") + assert(message.appArgs === Seq("two slices", "a hint of cinnamon")) + assert(message.sparkProperties.size === 2) + assert(message.sparkProperties("spark.live.long") === "true") + assert(message.sparkProperties("spark.shuffle.enabled") === "false") + assert(message.environmentVariables.size === 2) + assert(message.environmentVariables("PATH") === "/dev/null") + assert(message.environmentVariables("PYTHONPATH") === "/dev/null") // test JSON val json = message.toJson assertJsonEquals(json, submitDriverRequestJson) val newMessage = SubmitRestProtocolMessage.fromJson(json, classOf[SubmitDriverRequest]) - assert(newMessage.getSparkVersion === "1.2.3") - assert(newMessage.getClientSparkVersion === "1.2.3") - assert(newMessage.getAppName === "SparkPie") - assert(newMessage.getAppResource === "honey-walnut-cherry.jar") - assert(newMessage.getMainClass === "org.apache.spark.examples.SparkPie") - assert(newMessage.getJars === "mayonnaise.jar,ketchup.jar") - assert(newMessage.getFiles === "fireball.png") - assert(newMessage.getPyFiles === "do-not-eat-my.py") - assert(newMessage.getDriverMemory === "512m") - assert(newMessage.getDriverCores === "180") - assert(newMessage.getDriverExtraJavaOptions === " -Dslices=5 -Dcolor=mostly_red") - assert(newMessage.getDriverExtraClassPath === "food-coloring.jar") - assert(newMessage.getDriverExtraLibraryPath === "pickle.jar") - assert(newMessage.getSuperviseDriver === "false") - assert(newMessage.getExecutorMemory === "256m") - assert(newMessage.getTotalExecutorCores === "10000") - assert(newMessage.getAppArgs === message.getAppArgs) - assert(newMessage.getSparkProperties === message.getSparkProperties) - assert(newMessage.getEnvironmentVariables === message.getEnvironmentVariables) + assert(newMessage.clientSparkVersion === "1.2.3") + assert(newMessage.appName === "SparkPie") + assert(newMessage.appResource === "honey-walnut-cherry.jar") + assert(newMessage.mainClass === "org.apache.spark.examples.SparkPie") + assert(newMessage.jars === "mayonnaise.jar,ketchup.jar") + assert(newMessage.files === "fireball.png") + assert(newMessage.pyFiles === "do-not-eat-my.py") + assert(newMessage.driverMemory === "512m") + assert(newMessage.driverCores === "180") + assert(newMessage.driverExtraJavaOptions === " -Dslices=5 -Dcolor=mostly_red") + assert(newMessage.driverExtraClassPath === "food-coloring.jar") + assert(newMessage.driverExtraLibraryPath === "pickle.jar") + assert(newMessage.superviseDriver === "false") + assert(newMessage.executorMemory === "256m") + assert(newMessage.totalExecutorCores === "10000") + assert(newMessage.appArgs === message.appArgs) + assert(newMessage.sparkProperties === message.sparkProperties) + assert(newMessage.environmentVariables === message.environmentVariables) } test("SubmitDriverResponse") { val message = new SubmitDriverResponse intercept[SubmitRestProtocolException] { message.validate() } - intercept[IllegalArgumentException] { message.setSuccess("maybe not") } - message.setSparkVersion("1.2.3") - message.setDriverId("driver_123") - message.setSuccess("true") + message.serverSparkVersion = "1.2.3" + message.driverId = "driver_123" + message.success = "true" message.validate() + // bad fields + message.success = "maybe not" + intercept[SubmitRestProtocolException] { message.validate() } + message.success = "true" // test JSON val json = message.toJson assertJsonEquals(json, submitDriverResponseJson) val newMessage = SubmitRestProtocolMessage.fromJson(json, classOf[SubmitDriverResponse]) - assert(newMessage.getSparkVersion === "1.2.3") - assert(newMessage.getServerSparkVersion === "1.2.3") - assert(newMessage.getDriverId === "driver_123") - assert(newMessage.getSuccess === "true") + assert(newMessage.serverSparkVersion === "1.2.3") + assert(newMessage.driverId === "driver_123") + assert(newMessage.success === "true") } test("KillDriverRequest") { val message = new KillDriverRequest intercept[SubmitRestProtocolException] { message.validate() } - message.setSparkVersion("1.2.3") - message.setDriverId("driver_123") + message.clientSparkVersion = "1.2.3" + message.driverId = "driver_123" message.validate() // test JSON val json = message.toJson assertJsonEquals(json, killDriverRequestJson) val newMessage = SubmitRestProtocolMessage.fromJson(json, classOf[KillDriverRequest]) - assert(newMessage.getSparkVersion === "1.2.3") - assert(newMessage.getClientSparkVersion === "1.2.3") - assert(newMessage.getDriverId === "driver_123") + assert(newMessage.clientSparkVersion === "1.2.3") + assert(newMessage.driverId === "driver_123") } test("KillDriverResponse") { val message = new KillDriverResponse intercept[SubmitRestProtocolException] { message.validate() } - intercept[IllegalArgumentException] { message.setSuccess("maybe not") } - message.setSparkVersion("1.2.3") - message.setDriverId("driver_123") - message.setSuccess("true") + message.serverSparkVersion = "1.2.3" + message.driverId = "driver_123" + message.success = "true" message.validate() + // bad fields + message.success = "maybe not" + intercept[SubmitRestProtocolException] { message.validate() } + message.success = "true" // test JSON val json = message.toJson assertJsonEquals(json, killDriverResponseJson) val newMessage = SubmitRestProtocolMessage.fromJson(json, classOf[KillDriverResponse]) - assert(newMessage.getSparkVersion === "1.2.3") - assert(newMessage.getServerSparkVersion === "1.2.3") - assert(newMessage.getDriverId === "driver_123") - assert(newMessage.getSuccess === "true") + assert(newMessage.serverSparkVersion === "1.2.3") + assert(newMessage.driverId === "driver_123") + assert(newMessage.success === "true") } test("DriverStatusRequest") { val message = new DriverStatusRequest intercept[SubmitRestProtocolException] { message.validate() } - message.setSparkVersion("1.2.3") - message.setDriverId("driver_123") + message.clientSparkVersion = "1.2.3" + message.driverId = "driver_123" message.validate() // test JSON val json = message.toJson assertJsonEquals(json, driverStatusRequestJson) val newMessage = SubmitRestProtocolMessage.fromJson(json, classOf[DriverStatusRequest]) - assert(newMessage.getSparkVersion === "1.2.3") - assert(newMessage.getClientSparkVersion === "1.2.3") - assert(newMessage.getDriverId === "driver_123") + assert(newMessage.clientSparkVersion === "1.2.3") + assert(newMessage.driverId === "driver_123") } test("DriverStatusResponse") { val message = new DriverStatusResponse intercept[SubmitRestProtocolException] { message.validate() } - intercept[IllegalArgumentException] { message.setSuccess("maybe") } - message.setSparkVersion("1.2.3") - message.setDriverId("driver_123") - message.setSuccess("true") + message.serverSparkVersion = "1.2.3" + message.driverId = "driver_123" + message.success = "true" message.validate() // optional fields - message.setDriverState("RUNNING") - message.setWorkerId("worker_123") - message.setWorkerHostPort("1.2.3.4:7780") + message.driverState = "RUNNING" + message.workerId = "worker_123" + message.workerHostPort = "1.2.3.4:7780" + // bad fields + message.success = "maybe" + intercept[SubmitRestProtocolException] { message.validate() } + message.success = "true" // test JSON val json = message.toJson assertJsonEquals(json, driverStatusResponseJson) val newMessage = SubmitRestProtocolMessage.fromJson(json, classOf[DriverStatusResponse]) - assert(newMessage.getSparkVersion === "1.2.3") - assert(newMessage.getServerSparkVersion === "1.2.3") - assert(newMessage.getDriverId === "driver_123") - assert(newMessage.getDriverState === "RUNNING") - assert(newMessage.getSuccess === "true") - assert(newMessage.getWorkerId === "worker_123") - assert(newMessage.getWorkerHostPort === "1.2.3.4:7780") + assert(newMessage.serverSparkVersion === "1.2.3") + assert(newMessage.driverId === "driver_123") + assert(newMessage.driverState === "RUNNING") + assert(newMessage.success === "true") + assert(newMessage.workerId === "worker_123") + assert(newMessage.workerHostPort === "1.2.3.4:7780") } test("ErrorResponse") { val message = new ErrorResponse intercept[SubmitRestProtocolException] { message.validate() } - message.setSparkVersion("1.2.3") - message.setMessage("Field not found in submit request: X") + message.serverSparkVersion = "1.2.3" + message.message = "Field not found in submit request: X" message.validate() // test JSON val json = message.toJson assertJsonEquals(json, errorJson) val newMessage = SubmitRestProtocolMessage.fromJson(json, classOf[ErrorResponse]) - assert(newMessage.getSparkVersion === "1.2.3") - assert(newMessage.getServerSparkVersion === "1.2.3") - assert(newMessage.getMessage === "Field not found in submit request: X") + assert(newMessage.serverSparkVersion === "1.2.3") + assert(newMessage.message === "Field not found in submit request: X") } private val dummyRequestJson = @@ -312,7 +296,7 @@ class SubmitRestProtocolSuite extends FunSuite { """ |{ | "action" : "SubmitDriverRequest", - | "appArgs" : "[\"two slices\",\"a hint of cinnamon\"]", + | "appArgs" : ["two slices","a hint of cinnamon"], | "appName" : "SparkPie", | "appResource" : "honey-walnut-cherry.jar", | "clientSparkVersion" : "1.2.3", @@ -321,13 +305,13 @@ class SubmitRestProtocolSuite extends FunSuite { | "driverExtraJavaOptions" : " -Dslices=5 -Dcolor=mostly_red", | "driverExtraLibraryPath" : "pickle.jar", | "driverMemory" : "512m", - | "environmentVariables" : "{\"PATH\":\"/dev/null\",\"PYTHONPATH\":\"/dev/null\"}", + | "environmentVariables" : {"PATH":"/dev/null","PYTHONPATH":"/dev/null"}, | "executorMemory" : "256m", | "files" : "fireball.png", | "jars" : "mayonnaise.jar,ketchup.jar", | "mainClass" : "org.apache.spark.examples.SparkPie", | "pyFiles" : "do-not-eat-my.py", - | "sparkProperties" : "{\"spark.live.long\":\"true\",\"spark.shuffle.enabled\":\"false\"}", + | "sparkProperties" : {"spark.live.long":"true","spark.shuffle.enabled":"false"}, | "superviseDriver" : "false", | "totalExecutorCores" : "10000" |} @@ -389,7 +373,8 @@ class SubmitRestProtocolSuite extends FunSuite { |{ | "action" : "ErrorResponse", | "message" : "Field not found in submit request: X", - | "serverSparkVersion" : "1.2.3" + | "serverSparkVersion" : "1.2.3", + | "success": "false" |} """.stripMargin @@ -407,22 +392,15 @@ class SubmitRestProtocolSuite extends FunSuite { private class DummyResponse extends SubmitRestProtocolResponse private class DummyRequest extends SubmitRestProtocolRequest { - private val active = new SubmitRestProtocolField[Boolean] - private val age = new SubmitRestProtocolField[Int] - private val name = new SubmitRestProtocolField[String] - - def getActive: String = active.toString - def getAge: String = age.toString - def getName: String = name.toString - - def setActive(s: String): this.type = setBooleanField(active, s) - def setAge(s: String): this.type = setNumericField(age, s) - def setName(s: String): this.type = setField(name, s) - + var active: String = null + var age: String = null + var name: String = null protected override def doValidate(): Unit = { super.doValidate() assertFieldIsSet(name, "name") assertFieldIsSet(age, "age") - assert(age.getValue.get > 5, "Not old enough!") + assertFieldIsBoolean(active, "active") + assertFieldIsNumeric(age, "age") + assert(age.toInt > 5, "Not old enough!") } } diff --git a/pom.xml b/pom.xml index 4adfdf3eb8702..ac7cbaae90fde 100644 --- a/pom.xml +++ b/pom.xml @@ -150,6 +150,7 @@ ${scala.version} org.scala-lang 1.8.8 + 2.3.0 1.1.1.6