Skip to content

SPARK-4687. Add a recursive option to the addFile API #3670

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from

Conversation

sryza
Copy link
Contributor

@sryza sryza commented Dec 11, 2014

This adds a recursive option to the addFile API to satisfy Hive's needs. It only allows specifying HDFS dirs that will be copied down on every executor.

There are a couple outstanding questions.

  • Should we allow specifying local dirs as well? The best way to do this would probably be to archive them. The drawback is that it would require a fair bit of code that I don't know of any current use cases for.
  • The addFiles implementation has a caching component that I don't entirely understand. What events are we caching between? AFAICT it's users calling addFile on the same file in the same app at different times? Do we want/need to add something similar for addDirectory.
  • The addFiles implementation will check to see if an added file already exists and has the same contents. I imagine we want the same behavior, so planning to add this unless people think otherwise.

I plan to add some tests if people are OK with the approach.

@SparkQA
Copy link

SparkQA commented Dec 11, 2014

Test build #24339 has started for PR 3670 at commit 57df37c.

  • This patch merges cleanly.

@pwendell
Copy link
Contributor

Hey Sandy - could you explain in a bit more detail the hive use case? (sorry JIRA is down otherwise I would have commented there). It's a bit inconsistent that this only works for remote directories, but adding local directories would be a pain. It would just be useful to better understand what is going on and if there is a way we can facilitate this with a one-off change in Hive rather than adding this new API in Spark.

@SparkQA
Copy link

SparkQA commented Dec 11, 2014

Test build #24339 has finished for PR 3670 at commit 57df37c.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24339/
Test FAILed.

* Add a directory to be downloaded with this Spark job on every node.
* The `path` passed must be a directory in HDFS (or other Hadoop-supported
* filesystems). To access the directory in Spark jobs, use
* `SparkFiles.get(directoryName)` to find its download location.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to say that the directory will be recursively copied into the root of Spark's scratch directory.

@pwendell
Copy link
Contributor

pwendell commented Jan 6, 2015

I think it's fine to add this - it would be good to have a more fully fleshed version including tests, etc. To answer your questions:

Should we allow specifying local dirs as well? The best way to do this would probably be to archive them. The drawback is that it would require a fair bit of code that I don't know of any current use cases for.

IMO - we can punt on adding local directories and add it later if desired. For now, we should throw an exception if a file:// or URI with no scheme (and there is no default FS) is passed.

The addFiles implementation has a caching component that I don't entirely understand. What events are we caching between? AFAICT it's users calling addFile on the same file in the same app at different times? Do we want/need to add something similar for addDirectory. The addFiles implementation will check to see if an added file already exists and has the same contents. I imagine we want the same behavior, so planning to add this unless people think otherwise.

This only answers part of your question, but we need to decide what the semantics of calling addDirectory are if the contents changed. I think that for addFile we support this... i.e. you can add a file then change its contents then re-add it and we make sure to invalidate stale copies on the executors. We detect this by looking if the timestamp changed. This seems hard to do for directories. To do it properly we'd need to check if any of the files changed and also do some type of diff to efficiently ship only the new files. So maybe we should just fail-fast and throw an exception if addDirectory is called twice and the underlying directory, or any of its recursive contents, have mutated (@JoshRosen - what do you think of that)?

Popping up a bit, I wonder if the user API should still be called addFile and we should just add a flag recursive that will allow you to add directories. And we'd just say in the doc that the semantics are more restrictive for directories in that (a) it needs to be in a globally visible filesystem and (b) you cannot add the same directory twice - i.e. we make an immutable copy the first time it is added.

@sryza
Copy link
Contributor Author

sryza commented Jan 7, 2015

Popping up a bit, I wonder if the user API should still be called addFile and we should just add a flag recursive that will allow you to add directories.

"recursive" sounds a little weird to me in the sense that it implies adding a directory with recursive=false could be useful. Would it maybe make more sense to have an allowDirectories flag?

@JoshRosen
Copy link
Contributor

A bit of archaeology reveals that the original motivation for worry about overwriting files was that in local mode files added through addFile would be downloaded to the current working directory, so we wanted to prevent users from accidentally deleting the original copies of files by downloading files with the same names: mesos/spark#345. This was before we had the SparkFiles API, so at the time we assumed that user code would look in the CWD for added files and therefore didn't have an alternative to placing files in the local directory. Now that we have SparkFiles, though, I think we can remove this overwrite protection logic since it was originally guarding against Spark destroying users' source files, not against user behavior / errors.

It looks like we added the spark.files.overwrite setting to explicitly allow files to be overwritten with different contents (e.g. refreshing a file across all executors): fd833e7.

I guess the behavior for directories might be a bit different since you might want to also account for deletions (e.g. if I delete a file in the directory then re-add it, that file should probably be deleted on the executors as well).

RE: the recursive flag, I guess the idea here is something like cp and cp -r vs cpFile and cpDir?

@pwendell
Copy link
Contributor

pwendell commented Jan 8, 2015

Yea my thoughts for recursive were to match the semantics of cp which is the main file copying interface I interact with. Calling cp on a directory without the recursive flag gives an error.

@pwendell
Copy link
Contributor

pwendell commented Jan 8, 2015

I looked around a bit to try and find Java libraries that provide a recursive copy option to see if there are some conventions around the API. It turns out most of them don't - both Guava and Java 7's new file utilities don't have a recursive directory copy, probably because it's tricker to implement.

@sryza
Copy link
Contributor Author

sryza commented Jan 14, 2015

Uploading a further along work-in-progress patch that switches to addFile recursive. Still to do:

  • Support useCache, i.e. copy from local instead of remote.
  • Write tests that work. I'm a little unsure of how to approach this. I wrote a test that works by using the Hadoop FileSystem API to read local files. But now that I merged addFile with addDirectory, files with scheme "file:/" go through the HTTP server path. There's a MiniDFSCluster API I could possibly use to support hdfs:/ schemes, but it's relatively heavyweight.

I could use help on the second one @pwendell @JoshRosen if either of you have any ideas.

@SparkQA
Copy link

SparkQA commented Jan 14, 2015

Test build #25556 has started for PR 3670 at commit f78a416.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 14, 2015

Test build #25556 has finished for PR 3670 at commit f78a416.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25556/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Jan 14, 2015

Test build #25562 has started for PR 3670 at commit 8413c50.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 14, 2015

Test build #25562 has finished for PR 3670 at commit 8413c50.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25562/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Jan 16, 2015

Test build #25650 has started for PR 3670 at commit 12cac14.

  • This patch merges cleanly.

@sryza
Copy link
Contributor Author

sryza commented Jan 16, 2015

Ok here's a version that's ready for review. It still needs a little more doc, polish, and test or two, but would like to get validation on the approach.

@SparkQA
Copy link

SparkQA commented Jan 16, 2015

Test build #25650 has finished for PR 3670 at commit 12cac14.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25650/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Jan 20, 2015

Test build #25795 has started for PR 3670 at commit 3d7af57.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 20, 2015

Test build #25795 has finished for PR 3670 at commit 3d7af57.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25795/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Jan 26, 2015

Test build #26111 has started for PR 3670 at commit 710cbd9.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 26, 2015

Test build #26111 has finished for PR 3670 at commit 710cbd9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
    • case class Movie(movieId: Int, title: String, genres: Seq[String])
    • case class Params(
    • class ALS extends Estimator[ALSModel] with ALSParams
    • case class RatingBlock(srcIds: Array[Int], dstIds: Array[Int], ratings: Array[Float])

@SparkQA
Copy link

SparkQA commented Feb 2, 2015

Test build #26521 has finished for PR 3670 at commit 70cd24d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26521/
Test PASSed.

* supported for Hadoop-supported filesystems.
*/
def addFile(path: String, recursive: Boolean): Unit = {
val isLocalMode = conf.get("spark.master").startsWith("local")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we already have a SparkContext.isLocal variable for this, so I'd use that instead of inspecting SparkConf here.

@sryza
Copy link
Contributor Author

sryza commented Feb 3, 2015

Updated patch addresses @JoshRosen 's comments

@SparkQA
Copy link

SparkQA commented Feb 3, 2015

Test build #26563 has started for PR 3670 at commit f9fc77f.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 3, 2015

Test build #26563 has finished for PR 3670 at commit f9fc77f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26563/
Test FAILed.

@JoshRosen
Copy link
Contributor

To briefly address the questions @sryza raised in the PR description:

Should we allow specifying local dirs as well? The best way to do this would probably be to archive them. The drawback is that it would require a fair bit of code that I don't know of any current use cases for.

The user-facing API doesn't need to change in order to support this, so we could always choose to add this functionality later if we decide that there's a use-case for it.

The addFiles implementation has a caching component that I don't entirely understand. What events are we caching between? AFAICT it's users calling addFile on the same file in the same app at different times? Do we want/need to add something similar for addDirectory.

There's a few different layers of caching here. In all Spark versions, individual executors cache added files, so files will only be downloaded by the first task that require them. In Spark 1.2.0+, we added a per-host-per-application cache that allows executors for the same application running on the same host to share downloaded files; this is useful in scenarios where you have a huge number of executors per host (see #1616 for more details). Utils.fetchFile handles the lock file management to ensure that we only download files once per host, so I think we should already get that for free when downloading directories.

The addFiles implementation will check to see if an added file already exists and has the same contents. I imagine we want the same behavior, so planning to add this unless people think otherwise.

This seems fine, although I guess it could be a lot more expensive to perform this check for a directory than a single file.

@sryza sryza changed the title SPARK-4687. [WIP] Add an addDirectory API SPARK-4687. Add a recursive option to the addFile API Feb 3, 2015
@JoshRosen
Copy link
Contributor

LGTM. There are additional enhancements that we could make to this in the future, such as PySpark support, support for adding local directories, etc., but merging this PR should not block on them: the design here is sufficient for Hive on Spark's needs and doesn't preclude these future enhancements.

@pwendell
Copy link
Contributor

pwendell commented Feb 5, 2015

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Feb 5, 2015

Test build #26826 has started for PR 3670 at commit f9fc77f.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 5, 2015

Test build #26826 has finished for PR 3670 at commit f9fc77f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26826/
Test PASSed.

@JoshRosen
Copy link
Contributor

Since this looks good to me, I'm going to merge it into master (1.4.0) and branch-1.3 (1.3.0). Thanks @sryza!

asfgit pushed a commit that referenced this pull request Feb 5, 2015
This adds a recursive option to the addFile API to satisfy Hive's needs.  It only allows specifying HDFS dirs that will be copied down on every executor.

There are a couple outstanding questions.
* Should we allow specifying local dirs as well?  The best way to do this would probably be to archive them.  The drawback is that it would require a fair bit of code that I don't know of any current use cases for.
* The addFiles implementation has a caching component that I don't entirely understand.  What events are we caching between?  AFAICT it's users calling addFile on the same file in the same app at different times?  Do we want/need to add something similar for addDirectory.
*  The addFiles implementation will check to see if an added file already exists and has the same contents.  I imagine we want the same behavior, so planning to add this unless people think otherwise.

I plan to add some tests if people are OK with the approach.

Author: Sandy Ryza <sandy@cloudera.com>

Closes #3670 from sryza/sandy-spark-4687 and squashes the following commits:

f9fc77f [Sandy Ryza] Josh's comments
70cd24d [Sandy Ryza] Add another test
13da824 [Sandy Ryza] Revert executor changes
38bf94d [Sandy Ryza] Marcelo's comments
ca83849 [Sandy Ryza] Add addFile test
1941be3 [Sandy Ryza] Fix test and avoid HTTP server in local mode
31f15a9 [Sandy Ryza] Use cache recursively and fix some compile errors
0239c3d [Sandy Ryza] Change addDirectory to addFile with recursive
46fe70a [Sandy Ryza] SPARK-4687. Add a addDirectory API

(cherry picked from commit c4b1108)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
@asfgit asfgit closed this in c4b1108 Feb 5, 2015
@sryza
Copy link
Contributor Author

sryza commented Feb 5, 2015

Thanks @JoshRosen !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants