Skip to content

Commit b44e103

Browse files
author
Andrew Or
committed
Implement status requests + fix validation behavior
This commit makes the StandaloneRestServer actually handle status requests. The existing polling behavior from o.a.s.deploy.Client is also implemented in the StandaloneRestClient and amended. Additionally, the validation behavior was confusing before this commit. Previously the error message would seem to indicate that the user constructed a malformed message even if the message was constructed on the server side. This commit ensures that the error message is different for these two cases.
1 parent 120ab9d commit b44e103

12 files changed

+130
-59
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,7 @@ object SparkSubmit {
4949
private val STANDALONE = 2
5050
private val MESOS = 4
5151
private val LOCAL = 8
52-
private val REST = 16
53-
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | REST
52+
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
5453

5554
// Deploy modes
5655
private val CLIENT = 1

core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusRequestMessage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,5 +44,5 @@ private[spark] class DriverStatusRequestMessage extends SubmitRestProtocolMessag
4444
private[spark] object DriverStatusRequestMessage
4545
extends SubmitRestProtocolMessageCompanion[DriverStatusRequestMessage] {
4646
protected override def newMessage() = new DriverStatusRequestMessage
47-
protected override def fieldFromString(field: String) = DriverStatusRequestField.fromString(field)
47+
protected override def fieldFromString(f: String) = DriverStatusRequestField.fromString(f)
4848
}

core/src/main/scala/org/apache/spark/deploy/rest/DriverStatusResponseMessage.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@ private[spark] object DriverStatusResponseField
2828
case object MESSAGE extends DriverStatusResponseField
2929
case object MASTER extends DriverStatusResponseField
3030
case object DRIVER_ID extends DriverStatusResponseField
31+
case object SUCCESS extends DriverStatusResponseField
32+
// Standalone specific fields
3133
case object DRIVER_STATE extends DriverStatusResponseField
3234
case object WORKER_ID extends DriverStatusResponseField
3335
case object WORKER_HOST_PORT extends DriverStatusResponseField
34-
override val requiredFields = Seq(ACTION, SPARK_VERSION, MESSAGE,
35-
MASTER, DRIVER_ID, DRIVER_STATE, WORKER_ID, WORKER_HOST_PORT)
36-
override val optionalFields = Seq.empty
36+
override val requiredFields = Seq(ACTION, SPARK_VERSION, MASTER, DRIVER_ID, SUCCESS)
37+
override val optionalFields = Seq(MESSAGE, DRIVER_STATE, WORKER_ID, WORKER_HOST_PORT)
3738
}
3839

3940
/**
@@ -48,5 +49,5 @@ private[spark] class DriverStatusResponseMessage extends SubmitRestProtocolMessa
4849
private[spark] object DriverStatusResponseMessage
4950
extends SubmitRestProtocolMessageCompanion[DriverStatusResponseMessage] {
5051
protected override def newMessage() = new DriverStatusResponseMessage
51-
protected override def fieldFromString(field: String) = DriverStatusResponseField.fromString(field)
52+
protected override def fieldFromString(f: String) = DriverStatusResponseField.fromString(f)
5253
}

core/src/main/scala/org/apache/spark/deploy/rest/ErrorMessage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,5 @@ private[spark] class ErrorMessage extends SubmitRestProtocolMessage(
3939

4040
private[spark] object ErrorMessage extends SubmitRestProtocolMessageCompanion[ErrorMessage] {
4141
protected override def newMessage() = new ErrorMessage
42-
protected override def fieldFromString(field: String) = ErrorField.fromString(field)
42+
protected override def fieldFromString(f: String) = ErrorField.fromString(f)
4343
}

core/src/main/scala/org/apache/spark/deploy/rest/KillDriverRequestMessage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,5 +44,5 @@ private[spark] class KillDriverRequestMessage extends SubmitRestProtocolMessage(
4444
private[spark] object KillDriverRequestMessage
4545
extends SubmitRestProtocolMessageCompanion[KillDriverRequestMessage] {
4646
protected override def newMessage() = new KillDriverRequestMessage
47-
protected override def fieldFromString(field: String) = KillDriverRequestField.fromString(field)
47+
protected override def fieldFromString(f: String) = KillDriverRequestField.fromString(f)
4848
}

core/src/main/scala/org/apache/spark/deploy/rest/KillDriverResponseMessage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,5 @@ private[spark] class KillDriverResponseMessage extends SubmitRestProtocolMessage
4545
private[spark] object KillDriverResponseMessage
4646
extends SubmitRestProtocolMessageCompanion[KillDriverResponseMessage] {
4747
protected override def newMessage() = new KillDriverResponseMessage
48-
protected override def fieldFromString(field: String) = KillDriverResponseField.fromString(field)
48+
protected override def fieldFromString(f: String) = KillDriverResponseField.fromString(f)
4949
}

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,58 @@ import org.apache.spark.util.Utils
2828
* This client is intended to communicate with the StandaloneRestServer. Cluster mode only.
2929
*/
3030
private[spark] class StandaloneRestClient extends SubmitRestClient {
31+
import StandaloneRestClient._
32+
33+
/**
34+
* Request that the REST server submit a driver specified by the provided arguments.
35+
*
36+
* If the driver was successfully submitted, this polls the status of the driver that was
37+
* just submitted and reports it to the user. Otherwise, if the submission was unsuccessful,
38+
* this reports failure and logs an error message provided by the REST server.
39+
*/
40+
override def submitDriver(args: SparkSubmitArguments): SubmitDriverResponseMessage = {
41+
import SubmitDriverResponseField._
42+
val submitResponse = super.submitDriver(args).asInstanceOf[SubmitDriverResponseMessage]
43+
val submitSuccess = submitResponse.getFieldNotNull(SUCCESS).toBoolean
44+
if (submitSuccess) {
45+
val driverId = submitResponse.getFieldNotNull(DRIVER_ID)
46+
logInfo(s"Driver successfully submitted as $driverId. Polling driver state...")
47+
pollSubmittedDriverStatus(args.master, driverId)
48+
} else {
49+
val submitMessage = submitResponse.getFieldNotNull(MESSAGE)
50+
logError(s"Application submission failed: $submitMessage")
51+
}
52+
submitResponse
53+
}
54+
55+
/**
56+
* Poll the status of the driver that was just submitted and report it.
57+
* This retries up to a fixed number of times until giving up.
58+
*/
59+
private def pollSubmittedDriverStatus(master: String, driverId: String): Unit = {
60+
import DriverStatusResponseField._
61+
(1 to REPORT_DRIVER_STATUS_MAX_TRIES).foreach { _ =>
62+
val statusResponse = requestDriverStatus(master, driverId)
63+
.asInstanceOf[DriverStatusResponseMessage]
64+
val statusSuccess = statusResponse.getFieldNotNull(SUCCESS).toBoolean
65+
if (statusSuccess) {
66+
val driverState = statusResponse.getFieldNotNull(DRIVER_STATE)
67+
val workerId = statusResponse.getFieldOption(WORKER_ID)
68+
val workerHostPort = statusResponse.getFieldOption(WORKER_HOST_PORT)
69+
val exception = statusResponse.getFieldOption(MESSAGE)
70+
logInfo(s"State of driver $driverId is now $driverState.")
71+
// Log worker node, if present
72+
(workerId, workerHostPort) match {
73+
case (Some(id), Some(hp)) => logInfo(s"Driver is running on worker $id at $hp.")
74+
case _ =>
75+
}
76+
// Log exception stack trace, if present
77+
exception.foreach { e => logError(e) }
78+
return
79+
}
80+
}
81+
logError(s"Error: Master did not recognize driver $driverId.")
82+
}
3183

3284
/** Construct a submit driver request message. */
3385
override protected def constructSubmitRequest(
@@ -54,7 +106,7 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
54106
args.childArgs.foreach(message.appendAppArg)
55107
args.sparkProperties.foreach { case (k, v) => message.setSparkProperty(k, v) }
56108
// TODO: send special environment variables?
57-
message.validate()
109+
message
58110
}
59111

60112
/** Construct a kill driver request message. */
@@ -66,7 +118,6 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
66118
.setField(SPARK_VERSION, sparkVersion)
67119
.setField(MASTER, master)
68120
.setField(DRIVER_ID, driverId)
69-
.validate()
70121
}
71122

72123
/** Construct a driver status request message. */
@@ -78,7 +129,6 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
78129
.setField(SPARK_VERSION, sparkVersion)
79130
.setField(MASTER, master)
80131
.setField(DRIVER_ID, driverId)
81-
.validate()
82132
}
83133

84134
/** Throw an exception if this is not standalone mode. */
@@ -101,3 +151,8 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
101151
new URL("http://" + master.stripPrefix("spark://"))
102152
}
103153
}
154+
155+
private object StandaloneRestClient {
156+
val REPORT_DRIVER_STATUS_INTERVAL = 1000
157+
val REPORT_DRIVER_STATUS_MAX_TRIES = 10
158+
}

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ private[spark] class StandaloneRestServerHandler(
6767
.setField(MASTER, masterUrl)
6868
.setField(SUCCESS, response.success.toString)
6969
.setFieldIfNotNull(DRIVER_ID, response.driverId.orNull)
70-
.validate()
7170
}
7271

7372
/** Handle a request to kill a driver. */
@@ -83,23 +82,29 @@ private[spark] class StandaloneRestServerHandler(
8382
.setField(MASTER, masterUrl)
8483
.setField(DRIVER_ID, driverId)
8584
.setField(SUCCESS, response.success.toString)
86-
.validate()
8785
}
8886

8987
/** Handle a request for a driver's status. */
9088
override protected def handleStatus(
9189
request: DriverStatusRequestMessage): DriverStatusResponseMessage = {
9290
import DriverStatusResponseField._
93-
// TODO: Actually look up the status of the driver
94-
val master = request.getField(DriverStatusRequestField.MASTER)
9591
val driverId = request.getField(DriverStatusRequestField.DRIVER_ID)
96-
val driverState = "HEALTHY"
92+
val response = AkkaUtils.askWithReply[DriverStatusResponse](
93+
RequestDriverStatus(driverId), masterActor, askTimeout)
94+
// Format exception nicely, if it exists
95+
val message = response.exception.map { e =>
96+
val stackTraceString = e.getStackTrace.map { "\t" + _ }.mkString("\n")
97+
s"Exception from the cluster:\n$e\n$stackTraceString"
98+
}
9799
new DriverStatusResponseMessage()
98100
.setField(SPARK_VERSION, sparkVersion)
99-
.setField(MASTER, master)
101+
.setField(MASTER, masterUrl)
100102
.setField(DRIVER_ID, driverId)
101-
.setField(DRIVER_STATE, driverState)
102-
.validate()
103+
.setField(SUCCESS, response.found.toString)
104+
.setFieldIfNotNull(DRIVER_STATE, response.state.map(_.toString).orNull)
105+
.setFieldIfNotNull(WORKER_ID, response.workerId.orNull)
106+
.setFieldIfNotNull(WORKER_HOST_PORT, response.workerHostPort.orNull)
107+
.setFieldIfNotNull(MESSAGE, message.orNull)
103108
}
104109

105110
/**

core/src/main/scala/org/apache/spark/deploy/rest/SubmitDriverRequestMessage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ private[spark] object SubmitDriverRequestMessage
105105
import SubmitDriverRequestField._
106106

107107
protected override def newMessage() = new SubmitDriverRequestMessage
108-
protected override def fieldFromString(field: String) = SubmitDriverRequestField.fromString(field)
108+
protected override def fieldFromString(f: String) = SubmitDriverRequestField.fromString(f)
109109

110110
/**
111111
* Process the given field and value appropriately based on the type of the field.

core/src/main/scala/org/apache/spark/deploy/rest/SubmitDriverResponseMessage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,5 @@ private[spark] class SubmitDriverResponseMessage extends SubmitRestProtocolMessa
4545
private[spark] object SubmitDriverResponseMessage
4646
extends SubmitRestProtocolMessageCompanion[SubmitDriverResponseMessage] {
4747
protected override def newMessage() = new SubmitDriverResponseMessage
48-
protected override def fieldFromString(field: String) = SubmitDriverResponseField.fromString(field)
48+
protected override def fieldFromString(f: String) = SubmitDriverResponseField.fromString(f)
4949
}

0 commit comments

Comments
 (0)