Skip to content

Commit

Permalink
[KYUUBI apache#4623][Improvement][K8S] kubernetesApplicationOperation…
Browse files Browse the repository at this point in the history
… Using Informer instead of list
  • Loading branch information
zwangsheng committed Mar 28, 2023
1 parent 6d8fe6c commit 4ab530e
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging {
*/
object KyuubiConf {

import java.util.concurrent.TimeUnit

/** a custom directory that contains the [[KYUUBI_CONF_FILE_NAME]] */
final val KYUUBI_CONF_DIR = "KYUUBI_CONF_DIR"

Expand Down Expand Up @@ -1171,6 +1173,14 @@ object KyuubiConf {
.booleanConf
.createWithDefault(false)

val KUBERNETES_INFORMER_PERIOD: ConfigEntry[Long] =
buildConf("kyuubi.kubernetes.informer.period")
.doc("")
.version("1.8.0")
.timeConf
.checkValue(_ > 0, "must be positive number")
.createWithDefault(Duration.ofSeconds(10).toMillis)

// ///////////////////////////////////////////////////////////////////////////////////////////////
// SQL Engine Configuration //
// ///////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.kyuubi.engine

import io.fabric8.kubernetes.api.model.Pod

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.ApplicationState.ApplicationState

Expand Down Expand Up @@ -100,6 +102,15 @@ case class ApplicationInfo(
}
}

object ApplicationInfo {
def notFound: ApplicationInfo = {
ApplicationInfo(
null,
null,
ApplicationState.NOT_FOUND)
}
}

object ApplicationOperation {
val NOT_FOUND = "APPLICATION_NOT_FOUND"
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,43 @@

package org.apache.kyuubi.engine

import java.util
import java.util.concurrent.ConcurrentHashMap

import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.informers.ResourceEventHandler
import io.fabric8.kubernetes.client.informers.SharedIndexInformer

import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.ApplicationState.{ApplicationState, FAILED, FINISHED, PENDING, RUNNING, UNKNOWN}
import org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationState, SPARK_APP_ID_LABEL}
import org.apache.kyuubi.engine.ApplicationState.{ApplicationState, FAILED, FINISHED, NOT_FOUND, PENDING, RUNNING, UNKNOWN}
import org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationState, LABEL_KYUUBI_UNIQUE_KEY, SPARK_APP_ID_LABEL}
import org.apache.kyuubi.util.KubernetesUtils

class KubernetesApplicationOperation extends ApplicationOperation with Logging {

@volatile
private var kubernetesClient: KubernetesClient = _

private var driverInformer: SharedIndexInformer[Pod] = _
private var submitTimeout: Long = _

private val appInfoStore: ConcurrentHashMap[String, ApplicationInfo] =
new ConcurrentHashMap[String, ApplicationInfo]
private val deletedQueue: ConcurrentHashMap[Long, String] = new ConcurrentHashMap[Long, String]()

override def initialize(conf: KyuubiConf): Unit = {
info("Start initializing Kubernetes Client.")
kubernetesClient = KubernetesUtils.buildKubernetesClient(conf) match {
case Some(client) =>
info(s"Initialized Kubernetes Client connect to: ${client.getMasterUrl}")
submitTimeout = conf.get(KyuubiConf.ENGINE_SUBMIT_TIMEOUT)
// Using Kubernetes Informer to update application state
val informerPeriod = conf.get(KyuubiConf.KUBERNETES_INFORMER_PERIOD)
driverInformer = client.informers().sharedIndexInformerFor(
classOf[Pod],
informerPeriod)
driverInformer.addEventHandler(new DriverPodEventHandler()).start()
info("Start Kubernetes Client Informer.")
client
case None =>
warn("Fail to init Kubernetes Client for Kubernetes Application Operation")
Expand All @@ -55,34 +68,25 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
}

override def killApplicationByTag(tag: String): KillResponse = {
if (kubernetesClient != null) {
debug(s"Deleting application info from Kubernetes cluster by $tag tag")
try {
// Need driver only
val podList = findDriverPodByTag(tag)
if (podList.size() != 0) {
val targetPod = podList.get(0)
toApplicationState(targetPod.getStatus.getPhase) match {
case FAILED | UNKNOWN =>
(
false,
s"Target Pod ${targetPod.getMetadata.getName} is in FAILED or UNKNOWN status")
case _ =>
(
!kubernetesClient.pods.withName(targetPod.getMetadata.getName).delete().isEmpty,
s"Operation of deleted appId: ${podList.get(0).getMetadata.getName} is completed")
}
} else {
if (kubernetesClient == null) {
throw new IllegalStateException("Methods initialize and isSupported must be called ahead")
}
debug(s"Deleting application info from Kubernetes cluster by $tag tag")
try {
val info = appInfoStore.getOrDefault(tag, ApplicationInfo.notFound)
info.state match {
case NOT_FOUND | FAILED | UNKNOWN =>
(
false,
s"Target Pod(tag: $tag) is not found, due to pod have been deleted or not created")
}
} catch {
case e: Exception =>
(false, s"Failed to terminate application with $tag, due to ${e.getMessage}")
s"Target application[tag: $tag] is in ${info.state} status")
case _ =>
(
!kubernetesClient.pods.withName(info.name).delete().isEmpty,
s"Operation of deleted application[appId: ${info.id} ,tag: $tag] is completed")
}
} else {
throw new IllegalStateException("Methods initialize and isSupported must be called ahead")
} catch {
case e: Exception =>
(false, s"Failed to terminate application with $tag, due to ${e.getMessage}")
}
}

Expand All @@ -92,22 +96,11 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
}
debug(s"Getting application info from Kubernetes cluster by $tag tag")
try {
val podList = findDriverPodByTag(tag)
if (podList.size() != 0) {
val pod = podList.get(0)
val info = ApplicationInfo(
// spark pods always tag label `spark-app-selector:<spark-app-id>`
id = pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL),
name = pod.getMetadata.getName,
state = KubernetesApplicationOperation.toApplicationState(pod.getStatus.getPhase),
error = Option(pod.getStatus.getReason))
debug(s"Successfully got application info by $tag: $info")
return info
}
// Kyuubi should wait second if pod is not be created
submitTime match {
case Some(time) =>
val elapsedTime = System.currentTimeMillis() - time
val info = appInfoStore.getOrDefault(tag, ApplicationInfo.notFound)
info.state match {
// Kyuubi should wait second if pod is not be created
case NOT_FOUND & submitTime.nonEmpty =>
val elapsedTime = System.currentTimeMillis() - submitTime.get
if (elapsedTime > submitTimeout) {
error(s"Can't find target driver pod by tag: $tag, " +
s"elapsed time: ${elapsedTime}ms exceeds ${submitTimeout}ms.")
Expand All @@ -117,8 +110,11 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
s"elapsed time: ${elapsedTime}ms, return UNKNOWN status")
ApplicationInfo(id = null, name = null, ApplicationState.UNKNOWN)
}
case None =>
case NOT_FOUND =>
ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND)
case _ =>
debug(s"Successfully got application info by $tag: $info")
info
}
} catch {
case e: Exception =>
Expand All @@ -127,22 +123,59 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
}
}

private def findDriverPodByTag(tag: String): util.List[Pod] = {
val podList = kubernetesClient.pods()
.withLabel(KubernetesApplicationOperation.LABEL_KYUUBI_UNIQUE_KEY, tag).list().getItems
val size = podList.size()
if (size != 1) {
warn(s"Get Tag: ${tag} Driver Pod In Kubernetes size: ${size}, we expect 1")
override def stop(): Unit = {
try {
if (kubernetesClient != null) {
kubernetesClient.close()
}
if (driverInformer != null) {
driverInformer.stop()
}
} catch {
case e: Exception => error(e.getMessage)
}
podList
}

override def stop(): Unit = {
if (kubernetesClient != null) {
try {
kubernetesClient.close()
} catch {
case e: Exception => error(e.getMessage)
class DriverPodEventHandler extends ResourceEventHandler[Pod] {
private def filter(pod: Pod): Boolean = {
pod.getMetadata.getLabels.containsKey(LABEL_KYUUBI_UNIQUE_KEY)
}

private def updateState(pod: Pod): Unit = {
val metaData = pod.getMetadata
val state = toApplicationState(pod.getStatus.getPhase)
debug(s"Driver Informer change pod: ${metaData.getName} state: $state")
appInfoStore.put(
metaData.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY),
ApplicationInfo(
id = metaData.getLabels.get(SPARK_APP_ID_LABEL),
name = metaData.getName,
state = toApplicationState(pod.getStatus.getPhase),
error = Option(pod.getStatus.getReason)))
}

private def markDeleted(pod: Pod): Unit = {
deletedQueue.put(
System.currentTimeMillis(),
pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY))
}

override def onAdd(pod: Pod): Unit = {
if (filter(pod)) {
updateState(pod)
}
}

override def onUpdate(oldPod: Pod, newPod: Pod): Unit = {
if (filter(newPod)) {
updateState(newPod)
}
}

override def onDelete(pod: Pod, deletedFinalStateUnknown: Boolean): Unit = {
if (filter(pod)) {
updateState(pod)
markDeleted(pod)
}
}
}
Expand Down

0 comments on commit 4ab530e

Please sign in to comment.