From 4ab530e99b25557fd4222ce19a473eb093e70ee8 Mon Sep 17 00:00:00 2001 From: zwangsheng <2213335496@qq.com> Date: Tue, 28 Mar 2023 13:41:17 +0800 Subject: [PATCH] [KYUUBI #4623][Improvement][K8S] kubernetesApplicationOperation Using Informer instead of list --- .../org/apache/kyuubi/config/KyuubiConf.scala | 10 ++ .../kyuubi/engine/ApplicationOperation.scala | 11 ++ .../KubernetesApplicationOperation.scala | 151 +++++++++++------- 3 files changed, 113 insertions(+), 59 deletions(-) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 5e775595257..c2e2f3e64bb 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -198,6 +198,8 @@ case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging { */ object KyuubiConf { + import java.util.concurrent.TimeUnit + /** a custom directory that contains the [[KYUUBI_CONF_FILE_NAME]] */ final val KYUUBI_CONF_DIR = "KYUUBI_CONF_DIR" @@ -1171,6 +1173,14 @@ object KyuubiConf { .booleanConf .createWithDefault(false) + val KUBERNETES_INFORMER_PERIOD: ConfigEntry[Long] = + buildConf("kyuubi.kubernetes.informer.period") + .doc("") + .version("1.8.0") + .timeConf + .checkValue(_ > 0, "must be positive number") + .createWithDefault(Duration.ofSeconds(10).toMillis) + // /////////////////////////////////////////////////////////////////////////////////////////////// // SQL Engine Configuration // // /////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala index 00db372ce23..d878c8628f0 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala @@ -17,6 +17,8 @@ package org.apache.kyuubi.engine +import io.fabric8.kubernetes.api.model.Pod + import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.engine.ApplicationState.ApplicationState @@ -100,6 +102,15 @@ case class ApplicationInfo( } } +object ApplicationInfo { + def notFound: ApplicationInfo = { + ApplicationInfo( + null, + null, + ApplicationState.NOT_FOUND) + } +} + object ApplicationOperation { val NOT_FOUND = "APPLICATION_NOT_FOUND" } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index d0820b9aeb5..b2de9cd707d 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -17,30 +17,43 @@ package org.apache.kyuubi.engine -import java.util +import java.util.concurrent.ConcurrentHashMap import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.informers.ResourceEventHandler +import io.fabric8.kubernetes.client.informers.SharedIndexInformer import org.apache.kyuubi.Logging import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.engine.ApplicationState.{ApplicationState, FAILED, FINISHED, PENDING, RUNNING, UNKNOWN} -import org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationState, SPARK_APP_ID_LABEL} +import org.apache.kyuubi.engine.ApplicationState.{ApplicationState, FAILED, FINISHED, NOT_FOUND, PENDING, RUNNING, UNKNOWN} +import org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationState, LABEL_KYUUBI_UNIQUE_KEY, SPARK_APP_ID_LABEL} import org.apache.kyuubi.util.KubernetesUtils class KubernetesApplicationOperation extends ApplicationOperation with Logging { @volatile private var kubernetesClient: KubernetesClient = _ - + private var driverInformer: SharedIndexInformer[Pod] = _ private var submitTimeout: Long = _ + private val appInfoStore: ConcurrentHashMap[String, ApplicationInfo] = + new ConcurrentHashMap[String, ApplicationInfo] + private val deletedQueue: ConcurrentHashMap[Long, String] = new ConcurrentHashMap[Long, String]() + override def initialize(conf: KyuubiConf): Unit = { info("Start initializing Kubernetes Client.") kubernetesClient = KubernetesUtils.buildKubernetesClient(conf) match { case Some(client) => info(s"Initialized Kubernetes Client connect to: ${client.getMasterUrl}") submitTimeout = conf.get(KyuubiConf.ENGINE_SUBMIT_TIMEOUT) + // Using Kubernetes Informer to update application state + val informerPeriod = conf.get(KyuubiConf.KUBERNETES_INFORMER_PERIOD) + driverInformer = client.informers().sharedIndexInformerFor( + classOf[Pod], + informerPeriod) + driverInformer.addEventHandler(new DriverPodEventHandler()).start() + info("Start Kubernetes Client Informer.") client case None => warn("Fail to init Kubernetes Client for Kubernetes Application Operation") @@ -55,34 +68,25 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { } override def killApplicationByTag(tag: String): KillResponse = { - if (kubernetesClient != null) { - debug(s"Deleting application info from Kubernetes cluster by $tag tag") - try { - // Need driver only - val podList = findDriverPodByTag(tag) - if (podList.size() != 0) { - val targetPod = podList.get(0) - toApplicationState(targetPod.getStatus.getPhase) match { - case FAILED | UNKNOWN => - ( - false, - s"Target Pod ${targetPod.getMetadata.getName} is in FAILED or UNKNOWN status") - case _ => - ( - !kubernetesClient.pods.withName(targetPod.getMetadata.getName).delete().isEmpty, - s"Operation of deleted appId: ${podList.get(0).getMetadata.getName} is completed") - } - } else { + if (kubernetesClient == null) { + throw new IllegalStateException("Methods initialize and isSupported must be called ahead") + } + debug(s"Deleting application info from Kubernetes cluster by $tag tag") + try { + val info = appInfoStore.getOrDefault(tag, ApplicationInfo.notFound) + info.state match { + case NOT_FOUND | FAILED | UNKNOWN => ( false, - s"Target Pod(tag: $tag) is not found, due to pod have been deleted or not created") - } - } catch { - case e: Exception => - (false, s"Failed to terminate application with $tag, due to ${e.getMessage}") + s"Target application[tag: $tag] is in ${info.state} status") + case _ => + ( + !kubernetesClient.pods.withName(info.name).delete().isEmpty, + s"Operation of deleted application[appId: ${info.id} ,tag: $tag] is completed") } - } else { - throw new IllegalStateException("Methods initialize and isSupported must be called ahead") + } catch { + case e: Exception => + (false, s"Failed to terminate application with $tag, due to ${e.getMessage}") } } @@ -92,22 +96,11 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { } debug(s"Getting application info from Kubernetes cluster by $tag tag") try { - val podList = findDriverPodByTag(tag) - if (podList.size() != 0) { - val pod = podList.get(0) - val info = ApplicationInfo( - // spark pods always tag label `spark-app-selector:` - id = pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL), - name = pod.getMetadata.getName, - state = KubernetesApplicationOperation.toApplicationState(pod.getStatus.getPhase), - error = Option(pod.getStatus.getReason)) - debug(s"Successfully got application info by $tag: $info") - return info - } - // Kyuubi should wait second if pod is not be created - submitTime match { - case Some(time) => - val elapsedTime = System.currentTimeMillis() - time + val info = appInfoStore.getOrDefault(tag, ApplicationInfo.notFound) + info.state match { + // Kyuubi should wait second if pod is not be created + case NOT_FOUND & submitTime.nonEmpty => + val elapsedTime = System.currentTimeMillis() - submitTime.get if (elapsedTime > submitTimeout) { error(s"Can't find target driver pod by tag: $tag, " + s"elapsed time: ${elapsedTime}ms exceeds ${submitTimeout}ms.") @@ -117,8 +110,11 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { s"elapsed time: ${elapsedTime}ms, return UNKNOWN status") ApplicationInfo(id = null, name = null, ApplicationState.UNKNOWN) } - case None => + case NOT_FOUND => ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND) + case _ => + debug(s"Successfully got application info by $tag: $info") + info } } catch { case e: Exception => @@ -127,22 +123,59 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { } } - private def findDriverPodByTag(tag: String): util.List[Pod] = { - val podList = kubernetesClient.pods() - .withLabel(KubernetesApplicationOperation.LABEL_KYUUBI_UNIQUE_KEY, tag).list().getItems - val size = podList.size() - if (size != 1) { - warn(s"Get Tag: ${tag} Driver Pod In Kubernetes size: ${size}, we expect 1") + override def stop(): Unit = { + try { + if (kubernetesClient != null) { + kubernetesClient.close() + } + if (driverInformer != null) { + driverInformer.stop() + } + } catch { + case e: Exception => error(e.getMessage) } - podList } - override def stop(): Unit = { - if (kubernetesClient != null) { - try { - kubernetesClient.close() - } catch { - case e: Exception => error(e.getMessage) + class DriverPodEventHandler extends ResourceEventHandler[Pod] { + private def filter(pod: Pod): Boolean = { + pod.getMetadata.getLabels.containsKey(LABEL_KYUUBI_UNIQUE_KEY) + } + + private def updateState(pod: Pod): Unit = { + val metaData = pod.getMetadata + val state = toApplicationState(pod.getStatus.getPhase) + debug(s"Driver Informer change pod: ${metaData.getName} state: $state") + appInfoStore.put( + metaData.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY), + ApplicationInfo( + id = metaData.getLabels.get(SPARK_APP_ID_LABEL), + name = metaData.getName, + state = toApplicationState(pod.getStatus.getPhase), + error = Option(pod.getStatus.getReason))) + } + + private def markDeleted(pod: Pod): Unit = { + deletedQueue.put( + System.currentTimeMillis(), + pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY)) + } + + override def onAdd(pod: Pod): Unit = { + if (filter(pod)) { + updateState(pod) + } + } + + override def onUpdate(oldPod: Pod, newPod: Pod): Unit = { + if (filter(newPod)) { + updateState(newPod) + } + } + + override def onDelete(pod: Pod, deletedFinalStateUnknown: Boolean): Unit = { + if (filter(pod)) { + updateState(pod) + markDeleted(pod) } } }