Skip to content

Commit b95d7a6

Browse files
committed
improve
1 parent cc8d2c7 commit b95d7a6

File tree

2 files changed

+41
-48
lines changed

2 files changed

+41
-48
lines changed

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,8 @@ case class ApplicationInfo(
101101
}
102102

103103
object ApplicationInfo {
104-
def notFound: ApplicationInfo = {
105-
ApplicationInfo(
106-
null,
107-
null,
108-
ApplicationState.NOT_FOUND)
109-
}
104+
val NOT_FOUND: ApplicationInfo = ApplicationInfo(null, null, ApplicationState.NOT_FOUND)
105+
val UNKNOWN: ApplicationInfo = ApplicationInfo(null, null, ApplicationState.UNKNOWN)
110106
}
111107

112108
object ApplicationOperation {

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala

Lines changed: 39 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
2222
import com.google.common.cache.{Cache, CacheBuilder, RemovalNotification}
2323
import io.fabric8.kubernetes.api.model.Pod
2424
import io.fabric8.kubernetes.client.KubernetesClient
25-
import io.fabric8.kubernetes.client.informers.ResourceEventHandler
26-
import io.fabric8.kubernetes.client.informers.SharedIndexInformer
25+
import io.fabric8.kubernetes.client.informers.{ResourceEventHandler, SharedIndexInformer}
2726

2827
import org.apache.kyuubi.Logging
2928
import org.apache.kyuubi.config.KyuubiConf
@@ -48,22 +47,22 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
4847
case Some(client) =>
4948
info(s"Initialized Kubernetes Client connect to: ${client.getMasterUrl}")
5049
submitTimeout = conf.get(KyuubiConf.ENGINE_SUBMIT_TIMEOUT)
51-
// Using Kubernetes Informer to update application state
52-
// Set 0 for no resync, see more details in
53-
// https://github.com/fabric8io/kubernetes-client/discussions/5015
54-
enginePodInformer =
55-
client.pods().withLabel(LABEL_KYUUBI_UNIQUE_KEY).runnableInformer(0)
56-
enginePodInformer.addEventHandler(new SparkEnginePodEventHandler()).start()
50+
// Disable resync, see https://github.com/fabric8io/kubernetes-client/discussions/5015
51+
enginePodInformer = client.pods()
52+
.withLabel(LABEL_KYUUBI_UNIQUE_KEY)
53+
.inform(new SparkEnginePodEventHandler)
5754
info("Start Kubernetes Client Informer.")
58-
// Using Cache help clean delete app info
59-
val cachePeriod = conf.get(KyuubiConf.KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD)
55+
// Defer cleaning terminated application information to key expired
56+
// Use Cache help clean delete app info
57+
val retainPeriod = conf.get(KyuubiConf.KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD)
6058
deletedAppInfoCache = CacheBuilder
6159
.newBuilder()
62-
.expireAfterWrite(cachePeriod, TimeUnit.MILLISECONDS)
60+
.expireAfterWrite(retainPeriod, TimeUnit.MILLISECONDS)
6361
.removalListener((notification: RemovalNotification[String, ApplicationState]) => {
64-
debug(s"Remove cached appInfo[tag: ${notification.getKey}], " +
65-
s"due to app state: ${notification.getValue}.")
66-
appInfoStore.remove(notification.getKey)
62+
Option(appInfoStore.remove(notification.getKey)).foreach { removed =>
63+
info(s"Remove terminated application ${removed.id} with " +
64+
s"tag ${notification.getKey} and state ${removed.state}")
65+
}
6766
})
6867
.build()
6968
client
@@ -85,8 +84,8 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
8584
}
8685
debug(s"Deleting application info from Kubernetes cluster by $tag tag")
8786
try {
88-
val info = appInfoStore.getOrDefault(tag, ApplicationInfo.notFound)
89-
debug(s"Application info[tag: ${tag}] is in ${info.state}")
87+
val info = appInfoStore.getOrDefault(tag, ApplicationInfo.NOT_FOUND)
88+
debug(s"Application info[tag: $tag] is in ${info.state}")
9089
info.state match {
9190
case NOT_FOUND | FAILED | UNKNOWN =>
9291
(
@@ -109,30 +108,30 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
109108
}
110109
debug(s"Getting application info from Kubernetes cluster by $tag tag")
111110
try {
112-
val info = appInfoStore.getOrDefault(tag, ApplicationInfo.notFound)
113-
info.state match {
111+
val appInfo = appInfoStore.getOrDefault(tag, ApplicationInfo.NOT_FOUND)
112+
(appInfo.state, submitTime) match {
114113
// Kyuubi should wait second if pod is not be created
115-
case NOT_FOUND if submitTime.nonEmpty =>
116-
val elapsedTime = System.currentTimeMillis() - submitTime.get
114+
case (NOT_FOUND, Some(_submitTime)) =>
115+
val elapsedTime = System.currentTimeMillis - _submitTime
117116
if (elapsedTime > submitTimeout) {
118117
error(s"Can't find target driver pod by tag: $tag, " +
119118
s"elapsed time: ${elapsedTime}ms exceeds ${submitTimeout}ms.")
120-
ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND)
119+
ApplicationInfo.NOT_FOUND
121120
} else {
122121
warn("Wait for driver pod to be created, " +
123122
s"elapsed time: ${elapsedTime}ms, return UNKNOWN status")
124-
ApplicationInfo(id = null, name = null, ApplicationState.UNKNOWN)
123+
ApplicationInfo.UNKNOWN
125124
}
126-
case NOT_FOUND =>
127-
ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND)
125+
case (NOT_FOUND, None) =>
126+
ApplicationInfo.NOT_FOUND
128127
case _ =>
129-
debug(s"Successfully got application info by $tag: $info")
130-
info
128+
debug(s"Successfully got application info by $tag: $appInfo")
129+
appInfo
131130
}
132131
} catch {
133132
case e: Exception =>
134133
error(s"Failed to get application with $tag, due to ${e.getMessage}")
135-
ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND)
134+
ApplicationInfo.NOT_FOUND
136135
}
137136
}
138137

@@ -163,41 +162,39 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
163162
override def onUpdate(oldPod: Pod, newPod: Pod): Unit = {
164163
if (isSparkEnginePod(newPod)) {
165164
updateApplicationState(newPod)
166-
toApplicationState(newPod.getStatus.getPhase) match {
167-
case state if isTerminated(state) =>
168-
markTerminated(newPod)
169-
case _ =>
170-
// do nothing
165+
val appState = toApplicationState(newPod.getStatus.getPhase)
166+
if (isTerminated(appState)) {
167+
markApplicationTerminated(newPod)
171168
}
172169
}
173170
}
174171

175172
override def onDelete(pod: Pod, deletedFinalStateUnknown: Boolean): Unit = {
176173
if (isSparkEnginePod(pod)) {
177174
updateApplicationState(pod)
178-
markTerminated(pod)
175+
markApplicationTerminated(pod)
179176
}
180177
}
181178
}
182179

183180
private def isSparkEnginePod(pod: Pod): Boolean = {
184-
pod.getMetadata.getLabels.containsKey(LABEL_KYUUBI_UNIQUE_KEY)
181+
val labels = pod.getMetadata.getLabels
182+
labels.containsKey(LABEL_KYUUBI_UNIQUE_KEY) && labels.containsKey(SPARK_APP_ID_LABEL)
185183
}
186184

187185
private def updateApplicationState(pod: Pod): Unit = {
188-
val metaData = pod.getMetadata
189-
val state = toApplicationState(pod.getStatus.getPhase)
190-
debug(s"Driver Informer change pod: ${metaData.getName} state: $state")
186+
val appState = toApplicationState(pod.getStatus.getPhase)
187+
debug(s"Driver Informer changes pod: ${pod.getMetadata.getName} to state: $appState")
191188
appInfoStore.put(
192-
metaData.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY),
189+
pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY),
193190
ApplicationInfo(
194-
id = metaData.getLabels.get(SPARK_APP_ID_LABEL),
195-
name = metaData.getName,
196-
state = state,
191+
id = pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL),
192+
name = pod.getMetadata.getName,
193+
state = appState,
197194
error = Option(pod.getStatus.getReason)))
198195
}
199196

200-
private def markTerminated(pod: Pod): Unit = {
197+
private def markApplicationTerminated(pod: Pod): Unit = {
201198
deletedAppInfoCache.put(
202199
pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY),
203200
toApplicationState(pod.getStatus.getPhase))

0 commit comments

Comments
 (0)