Skip to content

Commit 408c65f

Browse files
mccheahash211
authored andcommitted
Monitor pod status in submission v2. (apache#283)
* Monitor pod status in submission v2. * Address comments
1 parent 8f6f0a0 commit 408c65f

File tree

4 files changed

+154
-57
lines changed

4 files changed

+154
-57
lines changed
Lines changed: 68 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,46 +14,50 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.spark.deploy.kubernetes.submit.v1
17+
package org.apache.spark.deploy.kubernetes.submit
1818

19-
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
19+
import java.util.concurrent.{CountDownLatch, TimeUnit}
2020

21-
import io.fabric8.kubernetes.api.model.Pod
21+
import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod}
2222
import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
2323
import io.fabric8.kubernetes.client.Watcher.Action
2424
import scala.collection.JavaConverters._
2525

26+
import org.apache.spark.SparkException
2627
import org.apache.spark.internal.Logging
2728
import org.apache.spark.util.ThreadUtils
2829

30+
private[kubernetes] trait LoggingPodStatusWatcher extends Watcher[Pod] {
31+
def awaitCompletion(): Unit
32+
}
33+
2934
/**
3035
* A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on
3136
* every state change and also at an interval for liveness.
3237
*
33-
* @param podCompletedFuture a CountDownLatch that is set to true when the watched pod finishes
3438
* @param appId
35-
* @param interval ms between each state request. If set to 0 or a negative number, the periodic
36-
* logging will be disabled.
39+
* @param maybeLoggingInterval ms between each state request. If provided, must be a positive
40+
* number.
3741
*/
38-
private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownLatch,
39-
appId: String,
40-
interval: Long)
41-
extends Watcher[Pod] with Logging {
42+
private[kubernetes] class LoggingPodStatusWatcherImpl(
43+
appId: String, maybeLoggingInterval: Option[Long])
44+
extends LoggingPodStatusWatcher with Logging {
4245

46+
private val podCompletedFuture = new CountDownLatch(1)
4347
// start timer for periodic logging
4448
private val scheduler =
4549
ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
4650
private val logRunnable: Runnable = new Runnable {
4751
override def run() = logShortStatus()
4852
}
4953

50-
private var pod: Option[Pod] = Option.empty
51-
private def phase: String = pod.map(_.getStatus().getPhase()).getOrElse("unknown")
52-
private def status: String = pod.map(_.getStatus().getContainerStatuses().toString())
53-
.getOrElse("unknown")
54+
private var pod = Option.empty[Pod]
55+
56+
private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown")
5457

5558
def start(): Unit = {
56-
if (interval > 0) {
59+
maybeLoggingInterval.foreach { interval =>
60+
require(interval > 0, s"Logging interval must be a positive time value, got: $interval ms.")
5761
scheduler.scheduleAtFixedRate(logRunnable, 0, interval, TimeUnit.MILLISECONDS)
5862
}
5963
}
@@ -98,7 +102,7 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL
98102
}
99103

100104
private def formatPodState(pod: Pod): String = {
101-
105+
// TODO include specific container state
102106
val details = Seq[(String, String)](
103107
// pod metadata
104108
("pod name", pod.getMetadata.getName()),
@@ -116,17 +120,59 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL
116120
("start time", pod.getStatus.getStartTime),
117121
("container images",
118122
pod.getStatus.getContainerStatuses()
119-
.asScala
120-
.map(_.getImage)
121-
.mkString(", ")),
123+
.asScala
124+
.map(_.getImage)
125+
.mkString(", ")),
122126
("phase", pod.getStatus.getPhase()),
123127
("status", pod.getStatus.getContainerStatuses().toString)
124128
)
129+
formatPairsBundle(details)
130+
}
125131

132+
private def formatPairsBundle(pairs: Seq[(String, String)]) = {
126133
// Use more loggable format if value is null or empty
127-
details.map { case (k, v) =>
128-
val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A")
129-
s"\n\t $k: $newValue"
134+
pairs.map {
135+
case (k, v) => s"\n\t $k: ${Option(v).filter(_.nonEmpty).getOrElse("N/A")}"
130136
}.mkString("")
131137
}
138+
139+
override def awaitCompletion(): Unit = {
140+
podCompletedFuture.countDown()
141+
logInfo(pod.map { p =>
142+
s"Container final statuses:\n\n${containersDescription(p)}"
143+
}.getOrElse("No containers were found in the driver pod."))
144+
}
145+
146+
private def containersDescription(p: Pod): String = {
147+
p.getStatus.getContainerStatuses.asScala.map { status =>
148+
Seq(
149+
("Container name", status.getName),
150+
("Container image", status.getImage)) ++
151+
containerStatusDescription(status)
152+
}.map(formatPairsBundle).mkString("\n\n")
153+
}
154+
155+
private def containerStatusDescription(
156+
containerStatus: ContainerStatus): Seq[(String, String)] = {
157+
val state = containerStatus.getState
158+
Option(state.getRunning)
159+
.orElse(Option(state.getTerminated))
160+
.orElse(Option(state.getWaiting))
161+
.map {
162+
case running: ContainerStateRunning =>
163+
Seq(
164+
("Container state", "Running"),
165+
("Container started at", running.getStartedAt))
166+
case waiting: ContainerStateWaiting =>
167+
Seq(
168+
("Container state", "Waiting"),
169+
("Pending reason", waiting.getReason))
170+
case terminated: ContainerStateTerminated =>
171+
Seq(
172+
("Container state", "Terminated"),
173+
("Exit code", terminated.getExitCode.toString))
174+
case unknown =>
175+
throw new SparkException(s"Unexpected container status type ${unknown.getClass}.")
176+
}.getOrElse(Seq(("Container state", "N/A")))
177+
}
132178
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.{SparkConf, SparkException}
3333
import org.apache.spark.deploy.kubernetes.{CompressionUtils, KubernetesCredentials}
3434
import org.apache.spark.deploy.kubernetes.config._
3535
import org.apache.spark.deploy.kubernetes.constants._
36-
import org.apache.spark.deploy.kubernetes.submit.{DriverPodKubernetesCredentialsProvider, KubernetesFileUtils}
36+
import org.apache.spark.deploy.kubernetes.submit.{DriverPodKubernetesCredentialsProvider, KubernetesFileUtils, LoggingPodStatusWatcherImpl}
3737
import org.apache.spark.deploy.rest.kubernetes.v1.{AppResource, ContainerAppResource, HttpClientUtil, KubernetesCreateSubmissionRequest, KubernetesSparkRestApi, RemoteAppResource, UploadedAppResource}
3838
import org.apache.spark.internal.Logging
3939
import org.apache.spark.util.{ShutdownHookManager, Utils}
@@ -83,7 +83,9 @@ private[spark] class Client(
8383
MEMORY_OVERHEAD_MIN))
8484
private val driverContainerMemoryWithOverhead = driverContainerMemoryMb + memoryOverheadMb
8585

86-
private val waitForAppCompletion: Boolean = sparkConf.get(WAIT_FOR_APP_COMPLETION)
86+
private val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
87+
private val loggingInterval = Some(sparkConf.get(REPORT_INTERVAL))
88+
.filter( _ => waitForAppCompletion)
8789

8890
private val secretBase64String = {
8991
val secretBytes = new Array[Byte](128)
@@ -147,10 +149,8 @@ private[spark] class Client(
147149
driverServiceManager.start(kubernetesClient, kubernetesAppId, sparkConf)
148150
// start outer watch for status logging of driver pod
149151
// only enable interval logging if in waitForAppCompletion mode
150-
val loggingInterval = if (waitForAppCompletion) sparkConf.get(REPORT_INTERVAL) else 0
151-
val driverPodCompletedLatch = new CountDownLatch(1)
152-
val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedLatch, kubernetesAppId,
153-
loggingInterval)
152+
val loggingWatch = new LoggingPodStatusWatcherImpl(
153+
kubernetesAppId, loggingInterval)
154154
Utils.tryWithResource(kubernetesClient
155155
.pods()
156156
.withName(kubernetesDriverPodName)
@@ -230,7 +230,7 @@ private[spark] class Client(
230230
// wait if configured to do so
231231
if (waitForAppCompletion) {
232232
logInfo(s"Waiting for application $kubernetesAppId to finish...")
233-
driverPodCompletedLatch.await()
233+
loggingWatch.awaitCompletion()
234234
logInfo(s"Application $kubernetesAppId finished.")
235235
} else {
236236
logInfo(s"Application $kubernetesAppId successfully launched.")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/Client.scala

Lines changed: 43 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import scala.collection.JavaConverters._
2525
import org.apache.spark.{SparkConf, SparkException}
2626
import org.apache.spark.deploy.kubernetes.config._
2727
import org.apache.spark.deploy.kubernetes.constants._
28+
import org.apache.spark.deploy.kubernetes.submit.{LoggingPodStatusWatcher, LoggingPodStatusWatcherImpl}
2829
import org.apache.spark.deploy.rest.kubernetes.v2.ResourceStagingServerSslOptionsProviderImpl
2930
import org.apache.spark.internal.Logging
3031
import org.apache.spark.launcher.SparkLauncher
@@ -48,9 +49,11 @@ private[spark] class Client(
4849
appArgs: Array[String],
4950
sparkJars: Seq[String],
5051
sparkFiles: Seq[String],
52+
waitForAppCompletion: Boolean,
5153
kubernetesClientProvider: SubmissionKubernetesClientProvider,
5254
initContainerComponentsProvider: DriverInitContainerComponentsProvider,
53-
kubernetesCredentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider)
55+
kubernetesCredentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider,
56+
loggingPodStatusWatcher: LoggingPodStatusWatcher)
5457
extends Logging {
5558

5659
private val kubernetesDriverPodName = sparkConf.get(KUBERNETES_DRIVER_POD_NAME)
@@ -186,27 +189,40 @@ private[spark] class Client(
186189
.endContainer()
187190
.endSpec()
188191
.build()
189-
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
190-
try {
191-
val driverOwnedResources = Seq(initContainerConfigMap) ++
192-
maybeSubmittedDependenciesSecret.toSeq ++
193-
credentialsSecret.toSeq
194-
val driverPodOwnerReference = new OwnerReferenceBuilder()
195-
.withName(createdDriverPod.getMetadata.getName)
196-
.withApiVersion(createdDriverPod.getApiVersion)
197-
.withUid(createdDriverPod.getMetadata.getUid)
198-
.withKind(createdDriverPod.getKind)
199-
.withController(true)
200-
.build()
201-
driverOwnedResources.foreach { resource =>
202-
val originalMetadata = resource.getMetadata
203-
originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference))
192+
Utils.tryWithResource(
193+
kubernetesClient
194+
.pods()
195+
.withName(resolvedDriverPod.getMetadata.getName)
196+
.watch(loggingPodStatusWatcher)) { _ =>
197+
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
198+
try {
199+
val driverOwnedResources = Seq(initContainerConfigMap) ++
200+
maybeSubmittedDependenciesSecret.toSeq ++
201+
credentialsSecret.toSeq
202+
val driverPodOwnerReference = new OwnerReferenceBuilder()
203+
.withName(createdDriverPod.getMetadata.getName)
204+
.withApiVersion(createdDriverPod.getApiVersion)
205+
.withUid(createdDriverPod.getMetadata.getUid)
206+
.withKind(createdDriverPod.getKind)
207+
.withController(true)
208+
.build()
209+
driverOwnedResources.foreach { resource =>
210+
val originalMetadata = resource.getMetadata
211+
originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference))
212+
}
213+
kubernetesClient.resourceList(driverOwnedResources: _*).createOrReplace()
214+
} catch {
215+
case e: Throwable =>
216+
kubernetesClient.pods().delete(createdDriverPod)
217+
throw e
218+
}
219+
if (waitForAppCompletion) {
220+
logInfo(s"Waiting for application $kubernetesAppId to finish...")
221+
loggingPodStatusWatcher.awaitCompletion()
222+
logInfo(s"Application $kubernetesAppId finished.")
223+
} else {
224+
logInfo(s"Deployed Spark application $kubernetesAppId into Kubernetes.")
204225
}
205-
kubernetesClient.resourceList(driverOwnedResources: _*).createOrReplace()
206-
} catch {
207-
case e: Throwable =>
208-
kubernetesClient.pods().delete(createdDriverPod)
209-
throw e
210226
}
211227
}
212228
}
@@ -274,6 +290,9 @@ private[spark] object Client {
274290
val kubernetesClientProvider = new SubmissionKubernetesClientProviderImpl(sparkConf)
275291
val kubernetesCredentialsMounterProvider =
276292
new DriverPodKubernetesCredentialsMounterProviderImpl(sparkConf, kubernetesAppId)
293+
val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
294+
val loggingInterval = Option(sparkConf.get(REPORT_INTERVAL)).filter( _ => waitForAppCompletion)
295+
val loggingPodStatusWatcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval)
277296
new Client(
278297
appName,
279298
kubernetesAppId,
@@ -282,8 +301,10 @@ private[spark] object Client {
282301
appArgs,
283302
sparkJars,
284303
sparkFiles,
304+
waitForAppCompletion,
285305
kubernetesClientProvider,
286306
initContainerComponentsProvider,
287-
kubernetesCredentialsMounterProvider).run()
307+
kubernetesCredentialsMounterProvider,
308+
loggingPodStatusWatcher).run()
288309
}
289310
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/ClientV2Suite.scala

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.kubernetes.submit.v2
1919
import java.io.File
2020

2121
import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapBuilder, DoneablePod, HasMetadata, Pod, PodBuilder, PodList, Secret, SecretBuilder}
22-
import io.fabric8.kubernetes.client.KubernetesClient
22+
import io.fabric8.kubernetes.client.{KubernetesClient, Watch}
2323
import io.fabric8.kubernetes.client.dsl.{MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource}
2424
import org.hamcrest.{BaseMatcher, Description}
2525
import org.mockito.{AdditionalAnswers, ArgumentCaptor, Mock, MockitoAnnotations}
@@ -35,6 +35,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
3535
import org.apache.spark.deploy.kubernetes.SparkPodInitContainerBootstrap
3636
import org.apache.spark.deploy.kubernetes.config._
3737
import org.apache.spark.deploy.kubernetes.constants._
38+
import org.apache.spark.deploy.kubernetes.submit.LoggingPodStatusWatcher
3839

3940
class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
4041
private val JARS_RESOURCE = SubmittedResourceIdAndSecret("jarsId", "jarsSecret")
@@ -59,13 +60,13 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
5960
private val SPARK_JARS = Seq(
6061
"hdfs://localhost:9000/app/jars/jar1.jar", "file:///app/jars/jar2.jar")
6162
private val RESOLVED_SPARK_JARS = Seq(
62-
"hdfs://localhost:9000/app/jars/jar1.jar", "file:///var/data/spark-jars/jar2.jar")
63+
"hdfs://localhost:9000/app/jars/jar1.jar", "file:///var/data/spark-jars/jar2.jar")
6364
private val RESOLVED_SPARK_REMOTE_AND_LOCAL_JARS = Seq(
64-
"/var/data/spark-jars/jar1.jar", "/var/data/spark-jars/jar2.jar")
65+
"/var/data/spark-jars/jar1.jar", "/var/data/spark-jars/jar2.jar")
6566
private val SPARK_FILES = Seq(
66-
"hdfs://localhost:9000/app/files/file1.txt", "file:///app/files/file2.txt")
67+
"hdfs://localhost:9000/app/files/file1.txt", "file:///app/files/file2.txt")
6768
private val RESOLVED_SPARK_FILES = Seq(
68-
"hdfs://localhost:9000/app/files/file1.txt", "file:///var/data/spark-files/file2.txt")
69+
"hdfs://localhost:9000/app/files/file1.txt", "file:///var/data/spark-files/file2.txt")
6970
private val INIT_CONTAINER_SECRET = new SecretBuilder()
7071
.withNewMetadata()
7172
.withName(INIT_CONTAINER_SECRET_NAME)
@@ -140,6 +141,12 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
140141
private var credentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider = _
141142
@Mock
142143
private var credentialsMounter: DriverPodKubernetesCredentialsMounter = _
144+
@Mock
145+
private var loggingPodStatusWatcher: LoggingPodStatusWatcher = _
146+
@Mock
147+
private var namedPodResource: PodResource[Pod, DoneablePod] = _
148+
@Mock
149+
private var watch: Watch = _
143150

144151
before {
145152
MockitoAnnotations.initMocks(this)
@@ -177,6 +184,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
177184
.build()
178185
}
179186
})
187+
when(podOps.withName(APP_ID)).thenReturn(namedPodResource)
188+
when(namedPodResource.watch(loggingPodStatusWatcher)).thenReturn(watch)
180189
when(containerLocalizedFilesResolver.resolveSubmittedAndRemoteSparkJars())
181190
.thenReturn(RESOLVED_SPARK_REMOTE_AND_LOCAL_JARS)
182191
when(containerLocalizedFilesResolver.resolveSubmittedSparkJars())
@@ -278,6 +287,25 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
278287
})
279288
}
280289

290+
test("Waiting for completion should await completion on the status watcher.") {
291+
expectationsForNoMountedCredentials()
292+
expectationsForNoDependencyUploader()
293+
new Client(
294+
APP_NAME,
295+
APP_ID,
296+
MAIN_CLASS,
297+
SPARK_CONF,
298+
APP_ARGS,
299+
SPARK_JARS,
300+
SPARK_FILES,
301+
true,
302+
kubernetesClientProvider,
303+
initContainerComponentsProvider,
304+
credentialsMounterProvider,
305+
loggingPodStatusWatcher).run()
306+
verify(loggingPodStatusWatcher).awaitCompletion()
307+
}
308+
281309
private def expectationsForNoDependencyUploader(): Unit = {
282310
when(initContainerComponentsProvider
283311
.provideInitContainerSubmittedDependencyUploader(ALL_EXPECTED_LABELS))
@@ -353,9 +381,11 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
353381
APP_ARGS,
354382
SPARK_JARS,
355383
SPARK_FILES,
384+
false,
356385
kubernetesClientProvider,
357386
initContainerComponentsProvider,
358-
credentialsMounterProvider).run()
387+
credentialsMounterProvider,
388+
loggingPodStatusWatcher).run()
359389
val podMatcher = new BaseMatcher[Pod] {
360390
override def matches(o: scala.Any): Boolean = {
361391
o match {

0 commit comments

Comments
 (0)