Skip to content

Commit dd418e3

Browse files
TongWei1105dongjoon-hyun
authored andcommitted
[SPARK-52334][CORE][K8S] update all files, jars, and pyFiles to reference the working directory after they are downloaded
### What changes were proposed in this pull request? This PR fixes a bug where submitting a Spark job using the --files option and also calling SparkContext.addFile() for a file with the same name causes Spark to throw an exception `Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: File a.text was already registered with a different path (old path = /tmp/spark-6aa5129d-5bbb-464a-9e50-5b6ffe364ffb/a.text, new path = /opt/spark/work-dir/a.text` ### Why are the changes needed? 1. Submit a Spark application using spark-submit with the --files option: `bin/spark-submit --files s3://bucket/a.text --class testDemo app.jar ` 2. In the testDemo application code, call: `sc.addFile("a.text", true)` This works correctly in YARN mode, but throws an error in Kubernetes mode. After [SPARK-33782](https://issues.apache.org/jira/browse/SPARK-33782), in Kubernetes mode, --files, --jars, --archiveFiles, and --pyFiles are all downloaded to the working directory. However, in the code, args.files = filesLocalFiles, and filesLocalFiles refers to a temporary download path, not the working directory. This causes issues when user code like testDemo calls sc.addFile("a.text", true), resulting in an error such as: `Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: File a.text was already registered with a different path (old path = /tmp/spark-6aa5129d-5bbb-464a-9e50-5b6ffe364ffb/a.text, new path = /opt/spark/work-dir/a.text` ### Does this PR introduce _any_ user-facing change? This issue can be resolved after this PR. ### How was this patch tested? Existed UT ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#51037 from TongWei1105/SPARK-52334. Authored-by: TongWei1105 <vvtwow@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent bba8bb8 commit dd418e3

File tree

2 files changed

+46
-2
lines changed

2 files changed

+46
-2
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -448,14 +448,16 @@ private[spark] class SparkSubmit extends Logging {
448448
log" from ${MDC(LogKeys.SOURCE_PATH, source)}" +
449449
log" to ${MDC(LogKeys.DESTINATION_PATH, dest)}")
450450
Utils.deleteRecursively(dest)
451-
if (isArchive) {
451+
val resourceUri = if (isArchive) {
452452
Utils.unpack(source, dest)
453+
localResources
453454
} else {
454455
Files.copy(source.toPath, dest.toPath)
456+
dest.toURI
455457
}
456458
// Keep the URIs of local files with the given fragments.
457459
Utils.getUriBuilder(
458-
localResources).fragment(resolvedUri.getFragment).build().toString
460+
resourceUri).fragment(resolvedUri.getFragment).build().toString
459461
} ++ avoidDownloads.map(_.toString)).mkString(",")
460462
}
461463

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1845,6 +1845,48 @@ class SparkSubmitSuite
18451845
assert(classpath.contains("."))
18461846
}
18471847

1848+
test("SPARK-52334: Update all files, jars, and pyFiles to" +
1849+
"reference the working directory after they are downloaded") {
1850+
withTempDir { dir =>
1851+
val text1 = File.createTempFile("test1_", ".txt", dir)
1852+
val zipFile1 = File.createTempFile("test1_", ".zip", dir)
1853+
TestUtils.createJar(Seq(text1), zipFile1)
1854+
val testFile = "test_metrics_config.properties"
1855+
val testPyFile = "test_metrics_system.properties"
1856+
val testJar = "TestUDTF.jar"
1857+
val clArgs = Seq(
1858+
"--deploy-mode", "client",
1859+
"--proxy-user", "test.user",
1860+
"--master", "k8s://host:port",
1861+
"--executor-memory", "5g",
1862+
"--class", "org.SomeClass",
1863+
"--driver-memory", "4g",
1864+
"--conf", "spark.kubernetes.namespace=spark",
1865+
"--conf", "spark.kubernetes.driver.container.image=bar",
1866+
"--conf", "spark.kubernetes.submitInDriver=true",
1867+
"--files", s"src/test/resources/$testFile",
1868+
"--py-files", s"src/test/resources/$testPyFile",
1869+
"--jars", s"src/test/resources/$testJar",
1870+
"--archives", s"${zipFile1.getAbsolutePath}#test_archives",
1871+
"/home/thejar.jar",
1872+
"arg1")
1873+
val appArgs = new SparkSubmitArguments(clArgs)
1874+
val _ = submit.prepareSubmitEnvironment(appArgs)
1875+
1876+
appArgs.files should be (Utils.resolveURIs(s"$testFile,$testPyFile"))
1877+
appArgs.pyFiles should be (Utils.resolveURIs(testPyFile))
1878+
appArgs.jars should be (Utils.resolveURIs(testJar))
1879+
appArgs.archives should be (Utils.resolveURIs(s"${zipFile1.getAbsolutePath}#test_archives"))
1880+
1881+
Files.isDirectory(Paths.get("test_archives")) should be(true)
1882+
Files.delete(Paths.get(testFile))
1883+
Files.delete(Paths.get(testPyFile))
1884+
Files.delete(Paths.get(testJar))
1885+
Files.delete(Paths.get(s"test_archives/${text1.getName}"))
1886+
Files.delete(Paths.get("test_archives/META-INF/MANIFEST.MF"))
1887+
}
1888+
}
1889+
18481890
// Requires Python dependencies for Spark Connect. Should be enabled by default.
18491891
ignore("Spark Connect application submission (Python)") {
18501892
val pyFile = File.createTempFile("remote_test", ".py")

0 commit comments

Comments
 (0)