Skip to content

Commit

Permalink
[KYUUBI apache#6028] Exited spark-submit process should not block bat…
Browse files Browse the repository at this point in the history
…ch submit queue

# 🔍 Description
## Issue References 🔗

While enabling batch implementation V2 with the following configurations
```
kyuubi.batch.impl.version=2
kyuubi.batch.submitter.enabled=true
kyuubi.batch.submitter.threads=48
spark.master=yarn
spark.submit.deployMode=cluster
spark.yarn.submit.waitAppCompletion=false
```

I found that the batch jobs will be blocked in the DB queue once a YARN queue has no resources, this brings an issue, the subsequential batch jobs that are going to be submitted to another YARN queue also be queued in DB, rather than YARN queue.

```
mysql> select state, engine_state, count(1) from metadata where state in ('INITIALIZED', 'PENDING', 'RUNNING') group by state, engine_state;
+-------------+--------------+----------+
| state       | engine_state | count(1) |
+-------------+--------------+----------+
| INITIALIZED | NULL         |      166 |
| PENDING     | NULL         |        1 |
| RUNNING     | PENDING      |      148 |
| RUNNING     | RUNNING      |      415 |
+-------------+--------------+----------+
```

## Describe Your Solution 🔧

The submitter queue whose size is controlled by `kyuubi.batch.submitter.threads` is designed to address the `spark-submit` process concurrency issue, too many `spark-submit` processes may run out of the Kyuubi server's node CPU/memory resources and eventually crash the service. For Spark YARN cluster mode, if set `spark.yarn.submit.waitAppCompletion=false`, the local `spark-submit` process exits immediately once the Application goes ACCEPTED status, even no resource could be allocated for the AM container, we should not block such case in submitter queue.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

Pass GA, and roll out into internal cluster.

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes apache#6028 from pan3793/batch-submit.

Closes apache#6028

05fcc75 [Cheng Pan] Exited spark-submit process should not block batch submit queue

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
  • Loading branch information
pan3793 authored and zhaohehuhu committed Feb 5, 2024
1 parent 879e8d5 commit 3496360
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
Seq("_123", "spark_exec", "spark@", "a" * 238).foreach { invalid =>
conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, invalid)
val builder = new SparkProcessBuilder("test", true, conf)
val e = intercept[KyuubiException](builder.validateConf)
val e = intercept[KyuubiException](builder.validateConf())
assert(e.getMessage === s"'$invalid' in spark.kubernetes.executor.podNamePrefix is" +
s" invalid. must conform https://kubernetes.io/docs/concepts/overview/" +
"working-with-objects/names/#dns-subdomain-names and the value length <= 237")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ private[kyuubi] class EngineRef(
acquiredPermit = true
val redactedCmd = builder.toString
info(s"Launching engine:\n$redactedCmd")
builder.validateConf
builder.validateConf()
val process = builder.start
var exitValue: Option[Int] = None
var lastApplicationInfo: Option[ApplicationInfo] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import java.nio.file.{Files, Path, Paths}

import scala.collection.JavaConverters._

import com.google.common.annotations.VisibleForTesting
import com.google.common.collect.EvictingQueue
import org.apache.commons.lang3.StringUtils.containsIgnoreCase

Expand Down Expand Up @@ -166,9 +165,8 @@ trait ProcBuilder {
// Visible for test
@volatile private[kyuubi] var logCaptureThreadReleased: Boolean = true
private var logCaptureThread: Thread = _
private var process: Process = _
@VisibleForTesting
@volatile private[kyuubi] var processLaunched: Boolean = _
@volatile private[kyuubi] var process: Process = _
@volatile private[kyuubi] var processLaunched: Boolean = false

private[kyuubi] lazy val engineLog: File = ProcBuilder.synchronized {
val engineLogTimeout = conf.get(KyuubiConf.ENGINE_LOG_TIMEOUT)
Expand Down Expand Up @@ -206,7 +204,7 @@ trait ProcBuilder {
file
}

def validateConf: Unit = {}
def validateConf(): Unit = {}

final def start: Process = synchronized {
process = processBuilder.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ class SparkProcessBuilder(
conf.getOption(KUBERNETES_NAMESPACE_KEY).orElse(defaultsConf.get(KUBERNETES_NAMESPACE_KEY))
}

override def validateConf: Unit = Validator.validateConf(conf)
override def validateConf(): Unit = Validator.validateConf(conf)

// For spark on kubernetes, spark pod using env SPARK_USER_NAME as current user
def setSparkUserName(userName: String, buffer: mutable.Buffer[String]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ class BatchJobSubmission(
getOperationLog)
}

def startupProcessAlive: Boolean =
builder.processLaunched && Option(builder.process).exists(_.isAlive)

override def currentApplicationInfo(): Option[ApplicationInfo] = {
if (isTerminal(state) && _applicationInfo.map(_.state).exists(ApplicationState.isTerminated)) {
return _applicationInfo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,13 @@ class KyuubiBatchService(
metadata.appState match {
// app that is not submitted to resource manager
case None | Some(ApplicationState.NOT_FOUND) => false
// app that is pending in resource manager
case Some(ApplicationState.PENDING) => false
// app that is pending in resource manager while the local startup
// process is alive. For example, in Spark YARN cluster mode, if set
// spark.yarn.submit.waitAppCompletion=false, the local spark-submit
// process exits immediately once Application goes ACCEPTED status,
// even no resource could be allocated for the AM container.
case Some(ApplicationState.PENDING) if batchSession.startupProcessAlive =>
false
// not sure, added for safe
case Some(ApplicationState.UNKNOWN) => false
case _ => true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ class KyuubiBatchSession(
batchArgs,
metadata)

def startupProcessAlive: Boolean = batchJobSubmissionOp.startupProcessAlive

private def waitMetadataRequestsRetryCompletion(): Unit = {
val batchId = batchJobSubmissionOp.batchId
sessionManager.getMetadataRequestsRetryRef(batchId).foreach {
Expand Down

0 comments on commit 3496360

Please sign in to comment.