Skip to content

[SPARK-2713] Executors of same application in same host should only download files & jars once #1616

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 13 commits into from

Conversation

li-zhihui
Copy link
Contributor

If Spark lunched multiple executors in one host for one application, every executor would download it dependent files and jars (if not using local: url) independently. It maybe result in huge latency. In my case, it result in 20 seconds latency to download dependent jars(size about 17M) when I lunched 32 executors in every host(total 4 hosts).

This patch will cache downloaded files and jars for executors to reduce network throughput and download latency. In my case, the latency was reduced from 20 seconds to less than 1 second.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

fetchFile(url, localDir, conf, securityMgr)
Files.move(new File(localDir, fileName), cachedFile)
}
lock.release()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the move throws some exception, the lock may never be released. You should wrap the release call in a finally block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @harishreedharan done.

@JoshRosen
Copy link
Contributor

Jenkins, this is ok to test.

@SparkQA
Copy link

SparkQA commented Jul 29, 2014

QA tests have started for PR 1616. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17391/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 30, 2014

QA results for PR 1616:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17391/consoleFull

@li-zhihui
Copy link
Contributor Author

@JoshRosen more comments?

@JoshRosen
Copy link
Contributor

This uses FileLock as its locking mechanism. According to those docs (emphasis mine),

Whether or not a lock actually prevents another program from accessing the content of the locked region is system-dependent and therefore unspecified. The native file-locking facilities of some systems are merely advisory, meaning that programs must cooperatively observe a known locking protocol in order to guarantee data integrity. On other systems native file locks are mandatory, meaning that if one program locks a region of a file then other programs are actually prevented from accessing that region in a way that would violate the lock. On yet other systems, whether native file locks are advisory or mandatory is configurable on a per-file basis. To ensure consistent and correct behavior across platforms, it is strongly recommended that the locks provided by this API be used as if they were advisory locks.

Can you comment on whether this approach is safe if we're using advisory locks, and maybe add that comment to the source code?

@SparkQA
Copy link

SparkQA commented Aug 4, 2014

QA tests have started for PR 1616. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17852/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 4, 2014

QA results for PR 1616:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17852/consoleFull

@li-zhihui
Copy link
Contributor Author

@JoshRosen added comment.

@JoshRosen
Copy link
Contributor

Thanks for commenting. I now realize that my concern about advisory locking was a little misguided, since only cooperating Spark processes will be coordinating through the lock file.

@JoshRosen
Copy link
Contributor

This seems like an alright fix and I'd like to get it into a release, but I'm concerned that this doesn't correctly handle every possible feature of fetchFile.

For example, there's some code in fetchFile to automatically decompress .tar.gz files. I don't remember why this code was added (or whether it's actually correct, since it seems to assume that files are downloaded into the current working directory), but I'm not sure that fetchCachedFile will properly handle that case; it seems like it would only copy the .tar.gz file without decompressing it in the executor's directory.

We could try to special-case fix this by moving the decompression logic into fetchCachedFile, but I'm worried that it will make fetchFile even harder to understand. I think that fetchFile might be due for a refactoring.

Also, do you think we should just replace fetchFile with fetchCachedFile and keep the uncached version private?

@SparkQA
Copy link

SparkQA commented Aug 5, 2014

QA tests have started for PR 1616. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17915/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 5, 2014

QA results for PR 1616:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17915/consoleFull

@li-zhihui
Copy link
Contributor Author

Thaks @JoshRosen sorry I missed the important operation (and I missed FileUtil.chmod(targetFile.getAbsolutePath, "a+x") too).

I add a new commit.

@li-zhihui
Copy link
Contributor Author

@JoshRosen any more comments?

@JoshRosen
Copy link
Contributor

Thanks a bunch for updating this; this seems like an important fix and I'd like to try to get it included soon in a release. I'll try my best to review this tomorrow and merge it if it looks good.

@li-zhihui
Copy link
Contributor Author

@JoshRosen do you have time to review it?

@pwendell
Copy link
Contributor

@JoshRosen if you do merge this please only into master and not 1.1... we are only fixing major regressions in 1.1 right now.

@@ -317,13 +317,58 @@ private[spark] object Utils extends Logging {
}

/**
* Copy cached file to targetDir, if not exists, download it from url firstly.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nitpick on naming, but I think it's confusing to have a method named fetchCachedFile with an option that has to be explicitly set in order to use the cache. I'd prefer to name this fetchFile, and rename the other method to something like doFetchFile or _fetchFile.

When fixing the merge conflict, do you mind moving the comment from the old fetchFile to here? I think the most comprehensive documentation should be on the public function, not the private one. I'd say something like

/**
    * Download a file requested by the executor . Supports fetching the file in a variety of ways,
    * including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
    *
    * If `useCache` is true, first attempts to fetch the file from a local cache that's shared across
    * executors running the same application.
    *
    * Throws SparkException if the target file already exists and has different contents than
    * the requested file.
    */

@JoshRosen
Copy link
Contributor

Hey, sorry to drop the ball on this review. Things got really busy during the 1.1.0 QA process, but I'm slowly getting back to my reviews now.

Do you think there's any potential for conflicts between multiple applications that attempt to add files with the same name but different contents? Different applications will share the same local directory. Maybe the timestamp takes care of this, assuming that it's unlikely for us to have timestamp collisions.

/cc @andrewor14, do you have any thoughts on this?

@JoshRosen
Copy link
Contributor

Actually, I don't think the timestamp will help us here:

If app A and B simultaneously add files named foo.txt and simultaneously attempt to download this file on the same worker (from different executors), then both will see that the cached file doesn't exist and both will attempt to download the file by calling the old fetchFile with the same name and target directory. This creates a race condition, since they're both attempting to write different contents to the same targetFile.

* If useCache == false, download file to targetDir directly.
*/
def fetchCachedFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager,
timestamp: Long, useCache: Boolean) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style should be

def fetchCachedFile(
    url: String,
    targetDir: File,
    ...
    useCache: Boolean) {
  ...
}

@andrewor14
Copy link
Contributor

Yes, it does seem like a problem if multiple simultaneous applications share the same files. Do we handle that even before this patch? I haven't dug deep into this, but should we have some kind application-specific directory for fetching files?

@JoshRosen
Copy link
Contributor

@andrewor14 I don't think that it was a problem before, but the reason is perhaps a little subtle:

The old fetchFile has a workflow where it first downloads the file to a temporary file and then moves that temporary file to its final destination. Although the parent directory of the temporary files (spark.local.dir) is shared by all executors, the actual temporary file is created through File.createTempFile, so it should have a unique name. After downloading the file, fetchFile moves it to targetDir and renames it. When fetching a file on an executor, targetDir is SparkFiles.getRootDirectory, which is a per-application temporary directory, so there's no potential for cross-application conflicts.

This PR uses that same code path to perform the actual download. The potential conflict occurs because targetDir is localDir when downloading a file that's not present in the cache.

@li-zhihui
Copy link
Contributor Author

@JoshRosen @andrewor14

I test the patch in yarn mode, and the localDir is a per-application temporary directory in this mode. Now I know it is a problem in standalone(and mesos) mode.
targetDir(from SparkFiles.getRootDirectory) is per-executor temporary directory(in my case ,it is /home/frank/hdfs/yarn/nm-local-dir/usercache/frank/appcache/application_1409795343243_0002/container_1409795343243_0002_01_000126/./), I think we can use targetDir + "../" as per-application directory to save the cache file.(abandon the solution)

BTW: The timestamp follow this code's logic: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L323 Although I don't understand why the timestamp could be changed in an application's life time.

* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
*
* If `useCache` is true, first attempts to fetch the file to a local cache that's shared
* across executors running the same application. `useCache` is used mainly for
* the the executors, not in local mode.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the the

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"and" not in local mode

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, thanks.

@SparkQA
Copy link

SparkQA commented Oct 8, 2014

QA tests have started for PR 1616 at commit 935fed6.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 8, 2014

QA tests have finished for PR 1616 at commit 935fed6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21447/Test PASSed.

@li-zhihui
Copy link
Contributor Author

@andrewor14 more comments?

Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager,
hadoopConf)
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf,
env.securityManager, hadoopConf, timestamp, useCache = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed, if this isn't meant to be used in local mode, shouldn't this be useCache = !isLocal?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, it would be good if you could add a small comment to explain here that the cache is not needed for local mode because there is no fetching involved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @andrewor14 , done

@andrewor14
Copy link
Contributor

Hey yeah @li-zhihui sorry this slipped on our end. This LGTM except for the one comment I made just now. I think the intention is that executors running in local mode shouldn't have to use the cache, as expressed in your javadoc for fetchFile. The code does otherwise, however.

@SparkQA
Copy link

SparkQA commented Oct 24, 2014

QA tests have started for PR 1616 at commit 36940df.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 24, 2014

Tests timed out for PR 1616 at commit 36940df after a configured wait of 120m.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22101/
Test FAILed.

@andrewor14
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Oct 24, 2014

QA tests have started for PR 1616 at commit 36940df.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 24, 2014

QA tests have finished for PR 1616 at commit 36940df.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22108/
Test FAILed.

@li-zhihui
Copy link
Contributor Author

@andrewor14 I guess the failure is non-interrelated with the patch. But I don't know why failed again, can you give me some advice?

@andrewor14
Copy link
Contributor

retest this please

@andrewor14
Copy link
Contributor

Yeah pyspark tests are kinda flaky. There's no way this patch could have caused it.

@SparkQA
Copy link

SparkQA commented Oct 24, 2014

Test build #22147 has started for PR 1616 at commit 36940df.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 24, 2014

Test build #22147 has finished for PR 1616 at commit 36940df.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/22147/
Test PASSed.

@andrewor14
Copy link
Contributor

Ok cool I'm merging this. Thanks @li-zhihui

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants