Skip to content

Commit fae76a0

Browse files
committed
Introduce blocking submit to kubernetes by default (apache#53)
* Introduce blocking submit to kubernetes by default Two new configuration settings: - spark.kubernetes.submit.waitAppCompletion - spark.kubernetes.report.interval * Minor touchups * More succinct logging for pod state * Fix import order * Switch to watch-based logging * Spaces in comma-joined volumes, labels, and containers * Use CountDownLatch instead of SettableFuture * Match parallel ConfigBuilder style * Disable logging in fire-and-forget mode Which is enabled with spark.kubernetes.submit.waitAppCompletion=false (default: true) * Additional log line for when application is launched * Minor wording changes * More logging * Drop log to DEBUG
1 parent de9a82e commit fae76a0

File tree

3 files changed

+225
-62
lines changed

3 files changed

+225
-62
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala

Lines changed: 92 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@ package org.apache.spark.deploy.kubernetes
1818

1919
import java.io.{File, FileInputStream}
2020
import java.security.{KeyStore, SecureRandom}
21-
import java.util.concurrent.{TimeoutException, TimeUnit}
21+
import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
2222
import java.util.concurrent.atomic.AtomicBoolean
2323
import javax.net.ssl.{SSLContext, TrustManagerFactory, X509TrustManager}
2424

2525
import com.google.common.base.Charsets
2626
import com.google.common.io.Files
2727
import com.google.common.util.concurrent.SettableFuture
2828
import io.fabric8.kubernetes.api.model._
29-
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watcher}
29+
import io.fabric8.kubernetes.client.{ConfigBuilder => K8SConfigBuilder, DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watcher}
3030
import io.fabric8.kubernetes.client.Watcher.Action
3131
import org.apache.commons.codec.binary.Base64
3232
import scala.collection.JavaConverters._
@@ -67,6 +67,8 @@ private[spark] class Client(
6767
private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT)
6868
private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT)
6969

70+
private val waitForAppCompletion: Boolean = sparkConf.get(WAIT_FOR_APP_COMPLETION)
71+
7072
private val secretBase64String = {
7173
val secretBytes = new Array[Byte](128)
7274
SECURE_RANDOM.nextBytes(secretBytes)
@@ -81,9 +83,11 @@ private[spark] class Client(
8183
ThreadUtils.newDaemonSingleThreadExecutor("kubernetes-client-retryable-futures"))
8284

8385
def run(): Unit = {
86+
logInfo(s"Starting application $kubernetesAppId in Kubernetes...")
8487
val (driverSubmitSslOptions, isKeyStoreLocalFile) = parseDriverSubmitSslOptions()
88+
8589
val parsedCustomLabels = parseCustomLabels(customLabels)
86-
var k8ConfBuilder = new ConfigBuilder()
90+
var k8ConfBuilder = new K8SConfigBuilder()
8791
.withApiVersion("v1")
8892
.withMasterUrl(master)
8993
.withNamespace(namespace)
@@ -116,73 +120,97 @@ private[spark] class Client(
116120
SPARK_APP_NAME_LABEL -> appName)
117121
++ parsedCustomLabels).asJava
118122
val containerPorts = buildContainerPorts()
119-
val submitCompletedFuture = SettableFuture.create[Boolean]
120-
val submitPending = new AtomicBoolean(false)
121-
val podWatcher = new DriverPodWatcher(
122-
submitCompletedFuture,
123-
submitPending,
124-
kubernetesClient,
125-
driverSubmitSslOptions,
126-
Array(submitServerSecret) ++ sslSecrets,
127-
driverKubernetesSelectors)
123+
124+
// start outer watch for status logging of driver pod
125+
val driverPodCompletedLatch = new CountDownLatch(1)
126+
// only enable interval logging if in waitForAppCompletion mode
127+
val loggingInterval = if (waitForAppCompletion) sparkConf.get(REPORT_INTERVAL) else 0
128+
val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedLatch, kubernetesAppId,
129+
loggingInterval)
128130
Utils.tryWithResource(kubernetesClient
129131
.pods()
130132
.withLabels(driverKubernetesSelectors)
131-
.watch(podWatcher)) { _ =>
132-
kubernetesClient.pods().createNew()
133-
.withNewMetadata()
134-
.withName(kubernetesAppId)
133+
.watch(loggingWatch)) { _ =>
134+
135+
// launch driver pod with inner watch to upload jars when it's ready
136+
val submitCompletedFuture = SettableFuture.create[Boolean]
137+
val submitPending = new AtomicBoolean(false)
138+
val podWatcher = new DriverPodWatcher(
139+
submitCompletedFuture,
140+
submitPending,
141+
kubernetesClient,
142+
driverSubmitSslOptions,
143+
Array(submitServerSecret) ++ sslSecrets,
144+
driverKubernetesSelectors)
145+
Utils.tryWithResource(kubernetesClient
146+
.pods()
135147
.withLabels(driverKubernetesSelectors)
136-
.endMetadata()
137-
.withNewSpec()
138-
.withRestartPolicy("OnFailure")
139-
.addNewVolume()
140-
.withName(SUBMISSION_APP_SECRET_VOLUME_NAME)
141-
.withNewSecret()
142-
.withSecretName(submitServerSecret.getMetadata.getName)
143-
.endSecret()
144-
.endVolume
145-
.addToVolumes(sslVolumes: _*)
146-
.withServiceAccount(serviceAccount)
147-
.addNewContainer()
148-
.withName(DRIVER_CONTAINER_NAME)
149-
.withImage(driverDockerImage)
150-
.withImagePullPolicy("IfNotPresent")
151-
.addNewVolumeMount()
148+
.watch(podWatcher)) { _ =>
149+
kubernetesClient.pods().createNew()
150+
.withNewMetadata()
151+
.withName(kubernetesAppId)
152+
.withLabels(driverKubernetesSelectors)
153+
.endMetadata()
154+
.withNewSpec()
155+
.withRestartPolicy("OnFailure")
156+
.addNewVolume()
152157
.withName(SUBMISSION_APP_SECRET_VOLUME_NAME)
153-
.withMountPath(secretDirectory)
154-
.withReadOnly(true)
155-
.endVolumeMount()
156-
.addToVolumeMounts(sslVolumeMounts: _*)
157-
.addNewEnv()
158-
.withName(ENV_SUBMISSION_SECRET_LOCATION)
159-
.withValue(s"$secretDirectory/$SUBMISSION_APP_SECRET_NAME")
160-
.endEnv()
161-
.addNewEnv()
162-
.withName(ENV_SUBMISSION_SERVER_PORT)
163-
.withValue(SUBMISSION_SERVER_PORT.toString)
164-
.endEnv()
165-
.addToEnv(sslEnvs: _*)
166-
.withPorts(containerPorts.asJava)
167-
.endContainer()
168-
.endSpec()
169-
.done()
170-
var submitSucceeded = false
171-
try {
172-
submitCompletedFuture.get(driverSubmitTimeoutSecs, TimeUnit.SECONDS)
173-
submitSucceeded = true
174-
} catch {
175-
case e: TimeoutException =>
176-
val finalErrorMessage: String = buildSubmitFailedErrorMessage(kubernetesClient, e)
177-
logError(finalErrorMessage, e)
178-
throw new SparkException(finalErrorMessage, e)
179-
} finally {
180-
if (!submitSucceeded) {
181-
Utils.tryLogNonFatalError {
182-
kubernetesClient.pods.withName(kubernetesAppId).delete()
158+
.withNewSecret()
159+
.withSecretName(submitServerSecret.getMetadata.getName)
160+
.endSecret()
161+
.endVolume
162+
.addToVolumes(sslVolumes: _*)
163+
.withServiceAccount(serviceAccount)
164+
.addNewContainer()
165+
.withName(DRIVER_CONTAINER_NAME)
166+
.withImage(driverDockerImage)
167+
.withImagePullPolicy("IfNotPresent")
168+
.addNewVolumeMount()
169+
.withName(SUBMISSION_APP_SECRET_VOLUME_NAME)
170+
.withMountPath(secretDirectory)
171+
.withReadOnly(true)
172+
.endVolumeMount()
173+
.addToVolumeMounts(sslVolumeMounts: _*)
174+
.addNewEnv()
175+
.withName(ENV_SUBMISSION_SECRET_LOCATION)
176+
.withValue(s"$secretDirectory/$SUBMISSION_APP_SECRET_NAME")
177+
.endEnv()
178+
.addNewEnv()
179+
.withName(ENV_SUBMISSION_SERVER_PORT)
180+
.withValue(SUBMISSION_SERVER_PORT.toString)
181+
.endEnv()
182+
.addToEnv(sslEnvs: _*)
183+
.withPorts(containerPorts.asJava)
184+
.endContainer()
185+
.endSpec()
186+
.done()
187+
var submitSucceeded = false
188+
try {
189+
submitCompletedFuture.get(driverSubmitTimeoutSecs, TimeUnit.SECONDS)
190+
submitSucceeded = true
191+
logInfo(s"Finished launching local resources to application $kubernetesAppId")
192+
} catch {
193+
case e: TimeoutException =>
194+
val finalErrorMessage: String = buildSubmitFailedErrorMessage(kubernetesClient, e)
195+
logError(finalErrorMessage, e)
196+
throw new SparkException(finalErrorMessage, e)
197+
} finally {
198+
if (!submitSucceeded) {
199+
Utils.tryLogNonFatalError {
200+
kubernetesClient.pods.withName(kubernetesAppId).delete()
201+
}
183202
}
184203
}
185204
}
205+
206+
// wait if configured to do so
207+
if (waitForAppCompletion) {
208+
logInfo(s"Waiting for application $kubernetesAppId to finish...")
209+
driverPodCompletedLatch.await()
210+
logInfo(s"Application $kubernetesAppId finished.")
211+
} else {
212+
logInfo(s"Application $kubernetesAppId successfully launched.")
213+
}
186214
}
187215
} finally {
188216
Utils.tryLogNonFatalError {
@@ -377,6 +405,8 @@ private[spark] class Client(
377405
Future {
378406
sparkConf.set("spark.driver.host", pod.getStatus.getPodIP)
379407
val submitRequest = buildSubmissionRequest()
408+
logInfo(s"Submitting local resources to driver pod for application " +
409+
s"$kubernetesAppId ...")
380410
driverSubmitter.submitApplication(submitRequest)
381411
}
382412
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes
18+
19+
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
20+
21+
import scala.collection.JavaConverters._
22+
23+
import io.fabric8.kubernetes.api.model.Pod
24+
import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
25+
import io.fabric8.kubernetes.client.Watcher.Action
26+
27+
import org.apache.spark.internal.Logging
28+
29+
/**
30+
* A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on
31+
* every state change and also at an interval for liveness.
32+
*
33+
* @param podCompletedFuture a CountDownLatch that is set to true when the watched pod finishes
34+
* @param appId
35+
* @param interval ms between each state request. If set to 0 or a negative number, the periodic
36+
* logging will be disabled.
37+
*/
38+
private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownLatch,
39+
appId: String,
40+
interval: Long)
41+
extends Watcher[Pod] with Logging {
42+
43+
// start timer for periodic logging
44+
private val scheduler = Executors.newScheduledThreadPool(1)
45+
private val logRunnable: Runnable = new Runnable {
46+
override def run() = logShortStatus()
47+
}
48+
if (interval > 0) {
49+
scheduler.scheduleWithFixedDelay(logRunnable, 0, interval, TimeUnit.MILLISECONDS)
50+
}
51+
52+
private var pod: Option[Pod] = Option.empty
53+
private var prevPhase: String = null
54+
private def phase: String = pod.map(_.getStatus().getPhase()).getOrElse("unknown")
55+
56+
override def eventReceived(action: Action, pod: Pod): Unit = {
57+
this.pod = Option(pod)
58+
59+
logShortStatus()
60+
if (prevPhase != phase) {
61+
logLongStatus()
62+
}
63+
prevPhase = phase
64+
65+
if (phase == "Succeeded" || phase == "Failed") {
66+
podCompletedFuture.countDown()
67+
}
68+
}
69+
70+
override def onClose(e: KubernetesClientException): Unit = {
71+
scheduler.shutdown()
72+
logDebug(s"Stopped watching application $appId with last-observed phase $phase")
73+
}
74+
75+
private def logShortStatus() = {
76+
logInfo(s"Application status for $appId (phase: $phase)")
77+
}
78+
79+
private def logLongStatus() = {
80+
logInfo("Phase changed, new state: " + pod.map(formatPodState(_)).getOrElse("unknown"))
81+
}
82+
83+
private def formatPodState(pod: Pod): String = {
84+
85+
val details = Seq[(String, String)](
86+
// pod metadata
87+
("pod name", pod.getMetadata.getName()),
88+
("namespace", pod.getMetadata.getNamespace()),
89+
("labels", pod.getMetadata.getLabels().asScala.mkString(", ")),
90+
("pod uid", pod.getMetadata.getUid),
91+
("creation time", pod.getMetadata.getCreationTimestamp()),
92+
93+
// spec details
94+
("service account name", pod.getSpec.getServiceAccountName()),
95+
("volumes", pod.getSpec.getVolumes().asScala.map(_.getName).mkString(", ")),
96+
("node name", pod.getSpec.getNodeName()),
97+
98+
// status
99+
("start time", pod.getStatus.getStartTime),
100+
("container images",
101+
pod.getStatus.getContainerStatuses()
102+
.asScala
103+
.map(_.getImage)
104+
.mkString(", ")),
105+
("phase", pod.getStatus.getPhase())
106+
)
107+
108+
// Use more loggable format if value is null or empty
109+
details.map { case (k, v) =>
110+
val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A")
111+
s"\n\t $k: $newValue"
112+
}.mkString("")
113+
}
114+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,4 +188,23 @@ package object config {
188188
.internal()
189189
.stringConf
190190
.createOptional
191+
192+
private[spark] val WAIT_FOR_APP_COMPLETION =
193+
ConfigBuilder("spark.kubernetes.submit.waitAppCompletion")
194+
.doc(
195+
"""
196+
| In cluster mode, whether to wait for the application to finish before exiting the
197+
| launcher process.
198+
""".stripMargin)
199+
.booleanConf
200+
.createWithDefault(true)
201+
202+
private[spark] val REPORT_INTERVAL =
203+
ConfigBuilder("spark.kubernetes.report.interval")
204+
.doc(
205+
"""
206+
| Interval between reports of the current app status in cluster mode.
207+
""".stripMargin)
208+
.timeConf(TimeUnit.MILLISECONDS)
209+
.createWithDefaultString("1s")
191210
}

0 commit comments

Comments
 (0)