diff --git a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala index 93153ad3620..a32a45d6c27 100644 --- a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala +++ b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala @@ -193,7 +193,7 @@ class KyuubiOperationKubernetesClusterClusterModeSuite Seq("_123", "spark_exec", "spark@", "a" * 238).foreach { invalid => conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, invalid) val builder = new SparkProcessBuilder("test", true, conf) - val e = intercept[KyuubiException](builder.validateConf) + val e = intercept[KyuubiException](builder.validateConf()) assert(e.getMessage === s"'$invalid' in spark.kubernetes.executor.podNamePrefix is" + s" invalid. must conform https://kubernetes.io/docs/concepts/overview/" + "working-with-objects/names/#dns-subdomain-names and the value length <= 237") diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala index 0d1b0adc775..0bb13b049f2 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala @@ -225,7 +225,7 @@ private[kyuubi] class EngineRef( acquiredPermit = true val redactedCmd = builder.toString info(s"Launching engine:\n$redactedCmd") - builder.validateConf + builder.validateConf() val process = builder.start var exitValue: Option[Int] = None var lastApplicationInfo: Option[ApplicationInfo] = None diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala index 8a8f59ffe99..c8c3f9c39f4 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala @@ -24,7 +24,6 @@ import java.nio.file.{Files, Path, Paths} import scala.collection.JavaConverters._ -import com.google.common.annotations.VisibleForTesting import com.google.common.collect.EvictingQueue import org.apache.commons.lang3.StringUtils.containsIgnoreCase @@ -166,9 +165,8 @@ trait ProcBuilder { // Visible for test @volatile private[kyuubi] var logCaptureThreadReleased: Boolean = true private var logCaptureThread: Thread = _ - private var process: Process = _ - @VisibleForTesting - @volatile private[kyuubi] var processLaunched: Boolean = _ + @volatile private[kyuubi] var process: Process = _ + @volatile private[kyuubi] var processLaunched: Boolean = false private[kyuubi] lazy val engineLog: File = ProcBuilder.synchronized { val engineLogTimeout = conf.get(KyuubiConf.ENGINE_LOG_TIMEOUT) @@ -206,7 +204,7 @@ trait ProcBuilder { file } - def validateConf: Unit = {} + def validateConf(): Unit = {} final def start: Process = synchronized { process = processBuilder.start() diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala index 10e1e6ce9e5..4c06d7951a3 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala @@ -302,7 +302,7 @@ class SparkProcessBuilder( conf.getOption(KUBERNETES_NAMESPACE_KEY).orElse(defaultsConf.get(KUBERNETES_NAMESPACE_KEY)) } - override def validateConf: Unit = Validator.validateConf(conf) + override def validateConf(): Unit = Validator.validateConf(conf) // For spark on kubernetes, spark pod using env SPARK_USER_NAME as current user def setSparkUserName(userName: String, buffer: mutable.Buffer[String]): Unit = { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala index 276fe344600..c29065f192d 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala @@ -100,6 +100,9 @@ class BatchJobSubmission( getOperationLog) } + def startupProcessAlive: Boolean = + builder.processLaunched && Option(builder.process).exists(_.isAlive) + override def currentApplicationInfo(): Option[ApplicationInfo] = { if (isTerminal(state) && _applicationInfo.map(_.state).exists(ApplicationState.isTerminated)) { return _applicationInfo diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala index 2bfbbce2ab7..e2736267f7e 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala @@ -94,8 +94,13 @@ class KyuubiBatchService( metadata.appState match { // app that is not submitted to resource manager case None | Some(ApplicationState.NOT_FOUND) => false - // app that is pending in resource manager - case Some(ApplicationState.PENDING) => false + // app that is pending in resource manager while the local startup + // process is alive. For example, in Spark YARN cluster mode, if set + // spark.yarn.submit.waitAppCompletion=false, the local spark-submit + // process exits immediately once Application goes ACCEPTED status, + // even no resource could be allocated for the AM container. + case Some(ApplicationState.PENDING) if batchSession.startupProcessAlive => + false // not sure, added for safe case Some(ApplicationState.UNKNOWN) => false case _ => true diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala index 531bbc3af87..4ac84c1d0dc 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala @@ -111,6 +111,8 @@ class KyuubiBatchSession( batchArgs, metadata) + def startupProcessAlive: Boolean = batchJobSubmissionOp.startupProcessAlive + private def waitMetadataRequestsRetryCompletion(): Unit = { val batchId = batchJobSubmissionOp.batchId sessionManager.getMetadataRequestsRetryRef(batchId).foreach {