Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zwangsheng committed Mar 30, 2023
1 parent 10965d3 commit 8e09403
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1172,13 +1172,13 @@ object KyuubiConf {
.createWithDefault(false)

val KUBERNETES_INFORMER_PERIOD: ConfigEntry[Long] =
buildConf("kyuubi.kubernetes.informer.period")
buildConf("kyuubi.kubernetes.informer.resyncPeriod")
.doc("Kubernetes Informer poll driver pod period." +
"Set too small can lead to, stress on Kubernetes Api Server; " +
"Set too lager can lead to, app info can't updated in time ")
.version("1.8.0")
.timeConf
.checkValue(_ > 0, "must be positive number")
.checkValue(_ >= 0, "Invalid resync period provided, It should be a non-negative value")
.createWithDefault(Duration.ofSeconds(10).toMillis)

val KUBERNETES_INFORMER_CACHE_PERIOD: ConfigEntry[Long] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import io.fabric8.kubernetes.client.informers.SharedIndexInformer

import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.ApplicationState.{ApplicationState, FAILED, FINISHED, NOT_FOUND, PENDING, RUNNING, UNKNOWN}
import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState, FAILED, FINISHED, NOT_FOUND, PENDING, RUNNING, UNKNOWN}
import org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationState, LABEL_KYUUBI_UNIQUE_KEY, SPARK_APP_ID_LABEL}
import org.apache.kyuubi.util.KubernetesUtils

Expand All @@ -36,7 +36,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {

@volatile
private var kubernetesClient: KubernetesClient = _
private var driverInformer: SharedIndexInformer[Pod] = _
private var enginePodInformer: SharedIndexInformer[Pod] = _
private var submitTimeout: Long = _

private val appInfoStore: ConcurrentHashMap[String, ApplicationInfo] =
Expand All @@ -51,10 +51,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
submitTimeout = conf.get(KyuubiConf.ENGINE_SUBMIT_TIMEOUT)
// Using Kubernetes Informer to update application state
val informerPeriod = conf.get(KyuubiConf.KUBERNETES_INFORMER_PERIOD)
driverInformer = client.informers().sharedIndexInformerFor(
classOf[Pod],
informerPeriod)
driverInformer.addEventHandler(new SparkEnginePodEventHandler()).start()
enginePodInformer =
client.pods().withLabel(LABEL_KYUUBI_UNIQUE_KEY).runnableInformer(informerPeriod)
enginePodInformer.addEventHandler(new SparkEnginePodEventHandler()).start()
info("Start Kubernetes Client Informer.")
// Using Cache help clean delete app info
val cachePeriod = conf.get(KyuubiConf.KUBERNETES_INFORMER_CACHE_PERIOD)
Expand Down Expand Up @@ -139,8 +138,8 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {

override def stop(): Unit = {
try {
if (driverInformer != null) {
driverInformer.stop()
if (enginePodInformer != null) {
enginePodInformer.stop()
}
if (kubernetesClient != null) {
kubernetesClient.close()
Expand All @@ -158,7 +157,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
pod.getMetadata.getLabels.containsKey(LABEL_KYUUBI_UNIQUE_KEY)
}

private def updateState(pod: Pod): Unit = {
private def updateApplicationState(pod: Pod): Unit = {
val metaData = pod.getMetadata
val state = toApplicationState(pod.getStatus.getPhase)
debug(s"Driver Informer change pod: ${metaData.getName} state: $state")
Expand All @@ -171,24 +170,24 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
error = Option(pod.getStatus.getReason)))
}

private def markDeleted(pod: Pod): Unit = {
private def markTerminated(pod: Pod): Unit = {
deletedAppInfoCache.put(
pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY),
toApplicationState(pod.getStatus.getPhase))
}

override def onAdd(pod: Pod): Unit = {
if (isSparkEnginePod(pod)) {
updateState(pod)
updateApplicationState(pod)
}
}

override def onUpdate(oldPod: Pod, newPod: Pod): Unit = {
if (isSparkEnginePod(newPod)) {
updateState(newPod)
updateApplicationState(newPod)
toApplicationState(newPod.getStatus.getPhase) match {
case FINISHED | FAILED | UNKNOWN =>
markDeleted(newPod)
case state if isTerminated(state) =>
markTerminated(newPod)
case _ =>
// do nothing
}
Expand All @@ -197,8 +196,8 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {

override def onDelete(pod: Pod, deletedFinalStateUnknown: Boolean): Unit = {
if (isSparkEnginePod(pod)) {
updateState(pod)
markDeleted(pod)
updateApplicationState(pod)
markTerminated(pod)
}
}
}
Expand Down

0 comments on commit 8e09403

Please sign in to comment.