-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-52334][CORE][K8S] update all files, jars, and pyFiles to reference the working directory after they are downloaded #51037
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Hi @dongjoon-hyun , |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think you can write a test case, @TongWei1105 ?
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you update this test to handle archive as well ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you update this test to handle archive as well ?
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log4j2.properties is not an archive file - and so ends up getting copied for destination (variant of existing cases).
I am trying to ensure that if (isArchive) { works as expected when the file actually results in unpacking the file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log4j2.propertiesis not an archive file - and so ends up getting copied for destination (variant of existing cases). I am trying to ensure thatif (isArchive) {works as expected when the file actually results in unpacking the file
The isArchive logic does get triggered in this case — the test has been updated to cover that scenario accordingly.
Thank you for your suggestion.
818ff74 to
4195283
Compare
|
When you have a moment, could you please take another look at this PR? Thanks! |
|
Sorry for the delay @TongWei1105 - I assume you closed PR due to lack of traction, if so reopening it. +CC @dongjoon-hyun , @HyukjinKwon |
4195283 to
a19c2c9
Compare
| localResources | ||
| } else { | ||
| Files.copy(source.toPath, dest.toPath) | ||
| dest.toURI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't it be source.toURI?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using dest.toURI, we are placing the file names from the current working directory. I mean yeah this should also work but I wonder why we should do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain how this fixes the issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain how this fixes the issue?
Thank you for your reply.
In Kubernetes mode, when using --files or --jars, Spark first stores a copy under the local /tmp directory and also copies it to /opt/spark/work-dir/. However, when addFile(file) is called a second time inside the SparkContext, the file path becomes /opt/spark/work-dir/file. In NettyStreamManager, however, the file entries are still recorded using the original /tmp path, so when it tries to guard against duplicate file registrations, a mismatch occurs and an exception is thrown.
Therefore, I believe that in Kubernetes mode, these paths should be unified to /opt/spark/work-dir/.
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I think looks fine to me but leave this to @dongjoon-hyun who more uses K8S in production
|
@dongjoon-hyun , when you're free, could you help me review this? |
|
Sorry guys for missing ping here. I can test this Today. |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main body itself looks good to me.
Please try to avoid new binary file like, archive1.zip. You can create it simply like the following, @TongWei1105 .
spark/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
Lines 698 to 699 in 7e53ce8
| val zipFile1 = File.createTempFile("test1_", ".zip", dir) | |
| TestUtils.createJar(Seq(text1, json1), zipFile1) |
…rking directory after they are downloaded. fix add ut add ut for archives fix
a19c2c9 to
0b15e2c
Compare
@dongjoon-hyun done |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @TongWei1105 and all.
Merged to master/4.1 for Apache Spark 4.1.0.
…ence the working directory after they are downloaded
### What changes were proposed in this pull request?
This PR fixes a bug where submitting a Spark job using the --files option and also calling SparkContext.addFile() for a file with the same name causes Spark to throw an exception
`Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: File a.text was already registered with a different path (old path = /tmp/spark-6aa5129d-5bbb-464a-9e50-5b6ffe364ffb/a.text, new path = /opt/spark/work-dir/a.text`
### Why are the changes needed?
1. Submit a Spark application using spark-submit with the --files option:
`bin/spark-submit --files s3://bucket/a.text --class testDemo app.jar `
2. In the testDemo application code, call:
`sc.addFile("a.text", true)`
This works correctly in YARN mode, but throws an error in Kubernetes mode.
After [SPARK-33782](https://issues.apache.org/jira/browse/SPARK-33782), in Kubernetes mode, --files, --jars, --archiveFiles, and --pyFiles are all downloaded to the working directory.
However, in the code, args.files = filesLocalFiles, and filesLocalFiles refers to a temporary download path, not the working directory.
This causes issues when user code like testDemo calls sc.addFile("a.text", true), resulting in an error such as:
`Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: File a.text was already registered with a different path (old path = /tmp/spark-6aa5129d-5bbb-464a-9e50-5b6ffe364ffb/a.text, new path = /opt/spark/work-dir/a.text`
### Does this PR introduce _any_ user-facing change?
This issue can be resolved after this PR.
### How was this patch tested?
Existed UT
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #51037 from TongWei1105/SPARK-52334.
Authored-by: TongWei1105 <vvtwow@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit dd418e3)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
What changes were proposed in this pull request?
This PR fixes a bug where submitting a Spark job using the --files option and also calling SparkContext.addFile() for a file with the same name causes Spark to throw an exception
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: File a.text was already registered with a different path (old path = /tmp/spark-6aa5129d-5bbb-464a-9e50-5b6ffe364ffb/a.text, new path = /opt/spark/work-dir/a.textWhy are the changes needed?
bin/spark-submit --files s3://bucket/a.text --class testDemo app.jarsc.addFile("a.text", true)This works correctly in YARN mode, but throws an error in Kubernetes mode.
After SPARK-33782, in Kubernetes mode, --files, --jars, --archiveFiles, and --pyFiles are all downloaded to the working directory.
However, in the code, args.files = filesLocalFiles, and filesLocalFiles refers to a temporary download path, not the working directory.
This causes issues when user code like testDemo calls sc.addFile("a.text", true), resulting in an error such as:
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: File a.text was already registered with a different path (old path = /tmp/spark-6aa5129d-5bbb-464a-9e50-5b6ffe364ffb/a.text, new path = /opt/spark/work-dir/a.textDoes this PR introduce any user-facing change?
This issue can be resolved after this PR.
How was this patch tested?
Existed UT
Was this patch authored or co-authored using generative AI tooling?
no