-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-2713] Executors of same application in same host should only download files & jars once #1616
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Can one of the admins verify this patch? |
fetchFile(url, localDir, conf, securityMgr) | ||
Files.move(new File(localDir, fileName), cachedFile) | ||
} | ||
lock.release() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the move throws some exception, the lock may never be released. You should wrap the release call in a finally block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @harishreedharan done.
Jenkins, this is ok to test. |
QA tests have started for PR 1616. This patch merges cleanly. |
QA results for PR 1616: |
@JoshRosen more comments? |
This uses FileLock as its locking mechanism. According to those docs (emphasis mine),
Can you comment on whether this approach is safe if we're using advisory locks, and maybe add that comment to the source code? |
QA tests have started for PR 1616. This patch merges cleanly. |
QA results for PR 1616: |
@JoshRosen added comment. |
Thanks for commenting. I now realize that my concern about advisory locking was a little misguided, since only cooperating Spark processes will be coordinating through the lock file. |
This seems like an alright fix and I'd like to get it into a release, but I'm concerned that this doesn't correctly handle every possible feature of For example, there's some code in We could try to special-case fix this by moving the decompression logic into Also, do you think we should just replace |
QA tests have started for PR 1616. This patch merges cleanly. |
QA results for PR 1616: |
Thaks @JoshRosen sorry I missed the important operation (and I missed I add a new commit. |
@JoshRosen any more comments? |
Thanks a bunch for updating this; this seems like an important fix and I'd like to try to get it included soon in a release. I'll try my best to review this tomorrow and merge it if it looks good. |
@JoshRosen do you have time to review it? |
@JoshRosen if you do merge this please only into master and not 1.1... we are only fixing major regressions in 1.1 right now. |
@@ -317,13 +317,58 @@ private[spark] object Utils extends Logging { | |||
} | |||
|
|||
/** | |||
* Copy cached file to targetDir, if not exists, download it from url firstly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nitpick on naming, but I think it's confusing to have a method named fetchCachedFile
with an option that has to be explicitly set in order to use the cache. I'd prefer to name this fetchFile
, and rename the other method to something like doFetchFile
or _fetchFile
.
When fixing the merge conflict, do you mind moving the comment from the old fetchFile
to here? I think the most comprehensive documentation should be on the public function, not the private one. I'd say something like
/**
* Download a file requested by the executor . Supports fetching the file in a variety of ways,
* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
*
* If `useCache` is true, first attempts to fetch the file from a local cache that's shared across
* executors running the same application.
*
* Throws SparkException if the target file already exists and has different contents than
* the requested file.
*/
Hey, sorry to drop the ball on this review. Things got really busy during the 1.1.0 QA process, but I'm slowly getting back to my reviews now. Do you think there's any potential for conflicts between multiple applications that attempt to add files with the same name but different contents? Different applications will share the same local directory. Maybe the /cc @andrewor14, do you have any thoughts on this? |
Actually, I don't think the timestamp will help us here: If app A and B simultaneously add files named |
* If useCache == false, download file to targetDir directly. | ||
*/ | ||
def fetchCachedFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager, | ||
timestamp: Long, useCache: Boolean) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style should be
def fetchCachedFile(
url: String,
targetDir: File,
...
useCache: Boolean) {
...
}
Yes, it does seem like a problem if multiple simultaneous applications share the same files. Do we handle that even before this patch? I haven't dug deep into this, but should we have some kind application-specific directory for fetching files? |
@andrewor14 I don't think that it was a problem before, but the reason is perhaps a little subtle: The old This PR uses that same code path to perform the actual download. The potential conflict occurs because |
I test the patch in yarn mode, and the BTW: The |
* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter. | ||
* | ||
* If `useCache` is true, first attempts to fetch the file to a local cache that's shared | ||
* across executors running the same application. `useCache` is used mainly for | ||
* the the executors, not in local mode. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the the
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"and" not in local mode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, thanks.
QA tests have started for PR 1616 at commit
|
QA tests have finished for PR 1616 at commit
|
Test PASSed. |
@andrewor14 more comments? |
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager, | ||
hadoopConf) | ||
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, | ||
env.securityManager, hadoopConf, timestamp, useCache = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just noticed, if this isn't meant to be used in local mode, shouldn't this be useCache = !isLocal
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If so, it would be good if you could add a small comment to explain here that the cache is not needed for local mode because there is no fetching involved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @andrewor14 , done
Hey yeah @li-zhihui sorry this slipped on our end. This LGTM except for the one comment I made just now. I think the intention is that executors running in local mode shouldn't have to use the cache, as expressed in your javadoc for |
QA tests have started for PR 1616 at commit
|
Tests timed out for PR 1616 at commit |
Test FAILed. |
retest this please |
QA tests have started for PR 1616 at commit
|
QA tests have finished for PR 1616 at commit
|
Test FAILed. |
@andrewor14 I guess the failure is non-interrelated with the patch. But I don't know why failed again, can you give me some advice? |
retest this please |
Yeah pyspark tests are kinda flaky. There's no way this patch could have caused it. |
Test build #22147 has started for PR 1616 at commit
|
Test build #22147 has finished for PR 1616 at commit
|
Test PASSed. |
Ok cool I'm merging this. Thanks @li-zhihui |
If Spark lunched multiple executors in one host for one application, every executor would download it dependent files and jars (if not using local: url) independently. It maybe result in huge latency. In my case, it result in 20 seconds latency to download dependent jars(size about 17M) when I lunched 32 executors in every host(total 4 hosts).
This patch will cache downloaded files and jars for executors to reduce network throughput and download latency. In my case, the latency was reduced from 20 seconds to less than 1 second.