Skip to content

Commit

Permalink
Reorder func & slow get app info
Browse files Browse the repository at this point in the history
  • Loading branch information
zwangsheng committed Mar 30, 2023
1 parent 22d9c16 commit 28f9a70
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ class KyuubiOperationKubernetesClusterClusterModeSuite

sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]

eventually(timeout(3.minutes), interval(50.milliseconds)) {
// wait for driver pod start
eventually(timeout(3.minutes), interval(5.second)) {
// trigger k8sOperation init here
val appInfo = k8sOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
assert(appInfo.state == RUNNING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,28 +153,6 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
}

private class SparkEnginePodEventHandler extends ResourceEventHandler[Pod] {
private def isSparkEnginePod(pod: Pod): Boolean = {
pod.getMetadata.getLabels.containsKey(LABEL_KYUUBI_UNIQUE_KEY)
}

private def updateApplicationState(pod: Pod): Unit = {
val metaData = pod.getMetadata
val state = toApplicationState(pod.getStatus.getPhase)
debug(s"Driver Informer change pod: ${metaData.getName} state: $state")
appInfoStore.put(
metaData.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY),
ApplicationInfo(
id = metaData.getLabels.get(SPARK_APP_ID_LABEL),
name = metaData.getName,
state = state,
error = Option(pod.getStatus.getReason)))
}

private def markTerminated(pod: Pod): Unit = {
deletedAppInfoCache.put(
pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY),
toApplicationState(pod.getStatus.getPhase))
}

override def onAdd(pod: Pod): Unit = {
if (isSparkEnginePod(pod)) {
Expand All @@ -201,6 +179,29 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
}
}
}

private def isSparkEnginePod(pod: Pod): Boolean = {
pod.getMetadata.getLabels.containsKey(LABEL_KYUUBI_UNIQUE_KEY)
}

private def updateApplicationState(pod: Pod): Unit = {
val metaData = pod.getMetadata
val state = toApplicationState(pod.getStatus.getPhase)
debug(s"Driver Informer change pod: ${metaData.getName} state: $state")
appInfoStore.put(
metaData.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY),
ApplicationInfo(
id = metaData.getLabels.get(SPARK_APP_ID_LABEL),
name = metaData.getName,
state = state,
error = Option(pod.getStatus.getReason)))
}

private def markTerminated(pod: Pod): Unit = {
deletedAppInfoCache.put(
pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY),
toApplicationState(pod.getStatus.getPhase))
}
}

object KubernetesApplicationOperation extends Logging {
Expand Down

0 comments on commit 28f9a70

Please sign in to comment.