Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exited spark-submit process should not block batch submit queue #6028

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
Seq("_123", "spark_exec", "spark@", "a" * 238).foreach { invalid =>
conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, invalid)
val builder = new SparkProcessBuilder("test", true, conf)
val e = intercept[KyuubiException](builder.validateConf)
val e = intercept[KyuubiException](builder.validateConf())
assert(e.getMessage === s"'$invalid' in spark.kubernetes.executor.podNamePrefix is" +
s" invalid. must conform https://kubernetes.io/docs/concepts/overview/" +
"working-with-objects/names/#dns-subdomain-names and the value length <= 237")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ private[kyuubi] class EngineRef(
acquiredPermit = true
val redactedCmd = builder.toString
info(s"Launching engine:\n$redactedCmd")
builder.validateConf
builder.validateConf()
val process = builder.start
var exitValue: Option[Int] = None
var lastApplicationInfo: Option[ApplicationInfo] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import java.nio.file.{Files, Path, Paths}

import scala.collection.JavaConverters._

import com.google.common.annotations.VisibleForTesting
import com.google.common.collect.EvictingQueue
import org.apache.commons.lang3.StringUtils.containsIgnoreCase

Expand Down Expand Up @@ -166,9 +165,8 @@ trait ProcBuilder {
// Visible for test
@volatile private[kyuubi] var logCaptureThreadReleased: Boolean = true
private var logCaptureThread: Thread = _
private var process: Process = _
@VisibleForTesting
@volatile private[kyuubi] var processLaunched: Boolean = _
@volatile private[kyuubi] var process: Process = _
@volatile private[kyuubi] var processLaunched: Boolean = false

private[kyuubi] lazy val engineLog: File = ProcBuilder.synchronized {
val engineLogTimeout = conf.get(KyuubiConf.ENGINE_LOG_TIMEOUT)
Expand Down Expand Up @@ -206,7 +204,7 @@ trait ProcBuilder {
file
}

def validateConf: Unit = {}
def validateConf(): Unit = {}

final def start: Process = synchronized {
process = processBuilder.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ class SparkProcessBuilder(
conf.getOption(KUBERNETES_NAMESPACE_KEY).orElse(defaultsConf.get(KUBERNETES_NAMESPACE_KEY))
}

override def validateConf: Unit = Validator.validateConf(conf)
override def validateConf(): Unit = Validator.validateConf(conf)

// For spark on kubernetes, spark pod using env SPARK_USER_NAME as current user
def setSparkUserName(userName: String, buffer: mutable.Buffer[String]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ class BatchJobSubmission(
getOperationLog)
}

def startupProcessAlive: Boolean =
builder.processLaunched && Option(builder.process).exists(_.isAlive)

override def currentApplicationInfo(): Option[ApplicationInfo] = {
if (isTerminal(state) && _applicationInfo.map(_.state).exists(ApplicationState.isTerminated)) {
return _applicationInfo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,13 @@ class KyuubiBatchService(
metadata.appState match {
// app that is not submitted to resource manager
case None | Some(ApplicationState.NOT_FOUND) => false
// app that is pending in resource manager
case Some(ApplicationState.PENDING) => false
// app that is pending in resource manager while the local startup
// process is alive. For example, in Spark YARN cluster mode, if set
// spark.yarn.submit.waitAppCompletion=false, the local spark-submit
// process exits immediately once Application goes ACCEPTED status,
// even no resource could be allocated for the AM container.
case Some(ApplicationState.PENDING) if batchSession.startupProcessAlive =>
false
ulysses-you marked this conversation as resolved.
Show resolved Hide resolved
// not sure, added for safe
case Some(ApplicationState.UNKNOWN) => false
case _ => true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ class KyuubiBatchSession(
batchArgs,
metadata)

def startupProcessAlive: Boolean = batchJobSubmissionOp.startupProcessAlive

private def waitMetadataRequestsRetryCompletion(): Unit = {
val batchId = batchJobSubmissionOp.batchId
sessionManager.getMetadataRequestsRetryRef(batchId).foreach {
Expand Down
Loading