@@ -18,7 +18,7 @@ package org.apache.spark.deploy.kubernetes
18
18
19
19
import java .io .File
20
20
import java .security .SecureRandom
21
- import java .util .concurrent .{Executors , TimeUnit }
21
+ import java .util .concurrent .{Executors , TimeoutException , TimeUnit }
22
22
import javax .net .ssl .X509TrustManager
23
23
24
24
import com .google .common .io .Files
@@ -34,7 +34,7 @@ import scala.concurrent.ExecutionContext
34
34
import scala .concurrent .duration .DurationInt
35
35
import scala .util .Success
36
36
37
- import org .apache .spark .{SPARK_VERSION , SparkConf }
37
+ import org .apache .spark .{SPARK_VERSION , SparkConf , SparkException }
38
38
import org .apache .spark .deploy .rest .{AppResource , KubernetesCreateSubmissionRequest , RemoteAppResource , TarGzippedData , UploadedAppResource }
39
39
import org .apache .spark .deploy .rest .kubernetes ._
40
40
import org .apache .spark .internal .Logging
@@ -130,8 +130,8 @@ private[spark] class Client(
130
130
val podWatcher = new Watcher [Pod ] {
131
131
override def eventReceived (action : Action , t : Pod ): Unit = {
132
132
if ((action == Action .ADDED || action == Action .MODIFIED )
133
- && t.getStatus.getPhase == " Running"
134
- && ! submitCompletedFuture.isDone) {
133
+ && t.getStatus.getPhase == " Running"
134
+ && ! submitCompletedFuture.isDone) {
135
135
t.getStatus
136
136
.getContainerStatuses
137
137
.asScala
@@ -216,8 +216,78 @@ private[spark] class Client(
216
216
.endContainer()
217
217
.endSpec()
218
218
.done()
219
- submitCompletedFuture.get(30 , TimeUnit .SECONDS )
220
- }
219
+ var submitSucceeded = false
220
+ try {
221
+ submitCompletedFuture.get(LAUNCH_TIMEOUT_SECONDS , TimeUnit .SECONDS )
222
+ submitSucceeded = true
223
+ } catch {
224
+ case e : TimeoutException =>
225
+ val driverPod = try {
226
+ kubernetesClient.pods().withName(kubernetesAppId).get()
227
+ } catch {
228
+ case throwable : Throwable =>
229
+ logError(s " Timed out while waiting $LAUNCH_TIMEOUT_SECONDS seconds for the " +
230
+ " driver pod to start, but an error occurred while fetching the driver" +
231
+ " pod's details." , throwable)
232
+ throw new SparkException (s " Timed out while waiting $LAUNCH_TIMEOUT_SECONDS" +
233
+ " seconds for the driver pod to start. Unfortunately, in attempting to fetch" +
234
+ " the latest state of the pod, another error was thrown. Check the logs for" +
235
+ " the error that was thrown in looking up the driver pod." , e)
236
+ }
237
+ val topLevelMessage = s " The driver pod with name ${driverPod.getMetadata.getName}" +
238
+ s " in namespace ${driverPod.getMetadata.getNamespace} was not ready in " +
239
+ s " $LAUNCH_TIMEOUT_SECONDS seconds. "
240
+ val podStatusPhase = if (driverPod.getStatus.getPhase != null ) {
241
+ s " Latest phase from the pod is: ${driverPod.getStatus.getPhase}"
242
+ } else {
243
+ " The pod had no final phase."
244
+ }
245
+ val podStatusMessage = if (driverPod.getStatus.getMessage != null ) {
246
+ s " Latest message from the pod is: ${driverPod.getStatus.getMessage}"
247
+ } else {
248
+ " The pod had no final message."
249
+ }
250
+ val failedDriverContainerStatusString = driverPod.getStatus
251
+ .getContainerStatuses
252
+ .asScala
253
+ .find(_.getName == DRIVER_LAUNCHER_CONTAINER_NAME )
254
+ .map(status => {
255
+ val lastState = status.getState
256
+ if (lastState.getRunning != null ) {
257
+ " Driver container last state: Running\n " +
258
+ s " Driver container started at: ${lastState.getRunning.getStartedAt}"
259
+ } else if (lastState.getWaiting != null ) {
260
+ " Driver container last state: Waiting\n " +
261
+ s " Driver container wait reason: ${lastState.getWaiting.getReason}\n " +
262
+ s " Driver container message: ${lastState.getWaiting.getMessage}\n "
263
+ } else if (lastState.getTerminated != null ) {
264
+ " Driver container last state: Terminated\n " +
265
+ s " Driver container started at: ${lastState.getTerminated.getStartedAt}\n " +
266
+ s " Driver container finished at: ${lastState.getTerminated.getFinishedAt}\n " +
267
+ s " Driver container exit reason: ${lastState.getTerminated.getReason}\n " +
268
+ s " Driver container exit code: ${lastState.getTerminated.getExitCode}\n " +
269
+ s " Driver container message: ${lastState.getTerminated.getMessage}"
270
+ } else {
271
+ " Driver container last state: Unknown"
272
+ }
273
+ }).getOrElse(" The driver container wasn't found in the pod; expected to find" +
274
+ s " container with name $DRIVER_LAUNCHER_CONTAINER_NAME" )
275
+ val finalErrorMessage = s " $topLevelMessage\n " +
276
+ s " $podStatusPhase\n " +
277
+ s " $podStatusMessage\n\n $failedDriverContainerStatusString"
278
+ logError(finalErrorMessage, e)
279
+ throw new SparkException (finalErrorMessage, e)
280
+ } finally {
281
+ if (! submitSucceeded) {
282
+ try {
283
+ kubernetesClient.pods.withName(kubernetesAppId).delete
284
+ } catch {
285
+ case throwable : Throwable =>
286
+ logError(" Failed to delete driver pod after it failed to run." , throwable)
287
+ }
288
+ }
289
+ }
290
+ }
221
291
222
292
Utils .tryWithResource(kubernetesClient
223
293
.pods()
@@ -338,6 +408,7 @@ private object Client {
338
408
private val DRIVER_LAUNCHER_CONTAINER_NAME = " spark-kubernetes-driver-launcher"
339
409
private val SECURE_RANDOM = new SecureRandom ()
340
410
private val SPARK_SUBMISSION_SECRET_BASE_DIR = " /var/run/secrets/spark-submission"
411
+ private val LAUNCH_TIMEOUT_SECONDS = 30
341
412
342
413
def main (args : Array [String ]): Unit = {
343
414
require(args.length >= 2 , s " Too few arguments. Usage: ${getClass.getName} <mainAppResource> " +
0 commit comments