Skip to content

Commit

Permalink
[SPARK-47495][CORE] Fix primary resource jar added to spark.jars twic…
Browse files Browse the repository at this point in the history
…e under k8s cluster mode

### What changes were proposed in this pull request?

In `SparkSubmit`, for `isKubernetesClusterModeDriver` code path, stop appending primary resource to `spark.jars` to avoid duplicating the primary resource jar in `spark.jars`.

### Why are the changes needed?

#### Context:

To submit spark jobs to Kubernetes under cluster mode, the spark-submit will be called twice. The first time SparkSubmit will run under k8s cluster mode, it will append primary resource to `spark.jars` and call `KubernetesClientApplication::start`  to create a driver pod. The driver pod will run spark-submit again with the updated configurations (with the same application jar  but that jar will also be in the `spark.jars`). This time the SparkSubmit will run under client mode with `spark.kubernetes.submitInDriver`  as `true`. Under this mode, all the jars in `spark.jars` will be downloaded to driver and jars' urls will be replaced by the driver local paths. Later SparkSubmit will append primary resource to `spark.jars` again. So in this case, `spark.jars` will have 2 paths of duplicate copies of primary resource, one with the original url user submit with, the other with the driver local file path. Later when driver starts the `SparkContext` it will copy all the `spark.jars` to `spark.app.initial.jar.urls`, and replace the driver local jars paths in `spark.app.initial.jar.urls` with driver file service paths, with which the executor can download those driver local jars.

#### Issues:
The executor will download 2 duplicate copies of primary resource, one with the original url user submit with, the other with the driver local file path, which leads to resource waste.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test added.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#45607 from leletan/fix_k8s_submit_jar_distribution.

Lead-authored-by: jiale_tan <jiale_tan@apple.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
jiale_tan and dongjoon-hyun committed Mar 24, 2024
1 parent b9335b9 commit c29d132
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -732,9 +732,10 @@ private[spark] class SparkSubmit extends Logging {
}

// Add the application jar automatically so the user doesn't have to call sc.addJar
// For isKubernetesClusterModeDriver, the jar is already added in the previous spark-submit
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
// For python and R files, the primary resource is already distributed as a regular file
if (!isYarnCluster && !args.isPython && !args.isR) {
if (!isKubernetesClusterModeDriver && !isYarnCluster && !args.isPython && !args.isR) {
var jars = sparkConf.get(JARS)
if (isUserJar(args.primaryResource)) {
jars = jars ++ Seq(args.primaryResource)
Expand Down
19 changes: 19 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,25 @@ class SparkSubmitSuite
}
}

test("SPARK-47495: Not to add primary resource to jars again" +
" in k8s client mode & driver runs inside a POD") {
val clArgs = Seq(
"--deploy-mode", "client",
"--proxy-user", "test.user",
"--master", "k8s://host:port",
"--executor-memory", "1g",
"--class", "org.SomeClass",
"--driver-memory", "1g",
"--conf", "spark.kubernetes.submitInDriver=true",
"--jars", "src/test/resources/TestUDTF.jar",
"/home/jarToIgnore.jar",
"arg1")
val appArgs = new SparkSubmitArguments(clArgs)
val (_, _, sparkConf, _) = submit.prepareSubmitEnvironment(appArgs)
sparkConf.get("spark.jars").contains("jarToIgnore") shouldBe false
sparkConf.get("spark.jars").contains("TestUDTF") shouldBe true
}

test("SPARK-33782: handles k8s files download to current directory") {
val clArgs = Seq(
"--deploy-mode", "client",
Expand Down

0 comments on commit c29d132

Please sign in to comment.