Skip to content

Fix deletion of files in current working directory by clearFiles() #345

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 29, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions core/src/main/scala/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,9 @@ class SparkContext(
}
addedFiles(key) = System.currentTimeMillis

// Fetch the file locally in case the task is executed locally
val filename = new File(path.split("/").last)
// Fetch the file locally in case a job is executed locally.
// Jobs that run through LocalScheduler will already fetch the required dependencies,
// but jobs run in DAGScheduler.runLocally() will not so we must fetch the files here.
Utils.fetchFile(path, new File("."))

logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
Expand All @@ -437,11 +438,10 @@ class SparkContext(
}

/**
* Clear the job's list of files added by `addFile` so that they do not get donwloaded to
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
* any new nodes.
*/
def clearFiles() {
addedFiles.keySet.map(_.split("/").last).foreach { k => new File(k).delete() }
addedFiles.clear()
}

Expand All @@ -465,7 +465,6 @@ class SparkContext(
* any new nodes.
*/
def clearJars() {
addedJars.keySet.map(_.split("/").last).foreach { k => new File(k).delete() }
addedJars.clear()
}

Expand Down
60 changes: 45 additions & 15 deletions core/src/main/scala/spark/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import scala.io.Source
import com.google.common.io.Files

/**
* Various utility methods used by Spark.
Expand Down Expand Up @@ -127,40 +128,69 @@ private object Utils extends Logging {
/**
* Download a file requested by the executor. Supports fetching the file in a variety of ways,
* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
*
* Throws SparkException if the target file already exists and has different contents than
* the requested file.
*/
def fetchFile(url: String, targetDir: File) {
val filename = url.split("/").last
val tempDir = System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))
val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir))
val targetFile = new File(targetDir, filename)
val uri = new URI(url)
uri.getScheme match {
case "http" | "https" | "ftp" =>
logInfo("Fetching " + url + " to " + targetFile)
logInfo("Fetching " + url + " to " + tempFile)
val in = new URL(url).openStream()
val out = new FileOutputStream(targetFile)
val out = new FileOutputStream(tempFile)
Utils.copyStream(in, out, true)
if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
tempFile.delete()
throw new SparkException("File " + targetFile + " exists and does not match contents of" +
" " + url)
} else {
Files.move(tempFile, targetFile)
}
case "file" | null =>
// Remove the file if it already exists
targetFile.delete()
// Symlink the file locally.
if (uri.isAbsolute) {
// url is absolute, i.e. it starts with "file:///". Extract the source
// file's absolute path from the url.
val sourceFile = new File(uri)
logInfo("Symlinking " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
FileUtil.symLink(sourceFile.getAbsolutePath, targetFile.getAbsolutePath)
val sourceFile = if (uri.isAbsolute) {
new File(uri)
} else {
new File(url)
}
if (targetFile.exists && !Files.equal(sourceFile, targetFile)) {
throw new SparkException("File " + targetFile + " exists and does not match contents of" +
" " + url)
} else {
// url is not absolute, i.e. itself is the path to the source file.
logInfo("Symlinking " + url + " to " + targetFile.getAbsolutePath)
FileUtil.symLink(url, targetFile.getAbsolutePath)
// Remove the file if it already exists
targetFile.delete()
// Symlink the file locally.
if (uri.isAbsolute) {
// url is absolute, i.e. it starts with "file:///". Extract the source
// file's absolute path from the url.
val sourceFile = new File(uri)
logInfo("Symlinking " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
FileUtil.symLink(sourceFile.getAbsolutePath, targetFile.getAbsolutePath)
} else {
// url is not absolute, i.e. itself is the path to the source file.
logInfo("Symlinking " + url + " to " + targetFile.getAbsolutePath)
FileUtil.symLink(url, targetFile.getAbsolutePath)
}
}
case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
val uri = new URI(url)
val conf = new Configuration()
val fs = FileSystem.get(uri, conf)
val in = fs.open(new Path(uri))
val out = new FileOutputStream(targetFile)
val out = new FileOutputStream(tempFile)
Utils.copyStream(in, out, true)
if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
tempFile.delete()
throw new SparkException("File " + targetFile + " exists and does not match contents of" +
" " + url)
} else {
Files.move(tempFile, targetFile)
}
}
// Decompress the file if it's a .tar or .tar.gz
if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) {
Expand Down
34 changes: 34 additions & 0 deletions core/src/main/scala/spark/api/java/JavaSparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,40 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* (in that order of preference). If neither of these is set, return None.
*/
def getSparkHome(): Option[String] = sc.getSparkHome()

/**
* Add a file to be downloaded into the working directory of this Spark job on every node.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), or an HTTP, HTTPS or FTP URI.
*/
def addFile(path: String) {
sc.addFile(path)
}

/**
* Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), or an HTTP, HTTPS or FTP URI.
*/
def addJar(path: String) {
sc.addJar(path)
}

/**
* Clear the job's list of JARs added by `addJar` so that they do not get downloaded to
* any new nodes.
*/
def clearJars() {
sc.clearJars()
}

/**
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
* any new nodes.
*/
def clearFiles() {
sc.clearFiles()
}
}

object JavaSparkContext {
Expand Down
34 changes: 18 additions & 16 deletions core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,22 +108,24 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
* SparkContext. Also adds any new JARs we fetched to the class loader.
*/
private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) {
// Fetch missing dependencies
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils.fetchFile(name, new File("."))
currentFiles(name) = timestamp
}
for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils.fetchFile(name, new File("."))
currentJars(name) = timestamp
// Add it to our class loader
val localName = name.split("/").last
val url = new File(".", localName).toURI.toURL
if (!classLoader.getURLs.contains(url)) {
logInfo("Adding " + url + " to class loader")
classLoader.addURL(url)
synchronized {
// Fetch missing dependencies
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils.fetchFile(name, new File("."))
currentFiles(name) = timestamp
}
for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils.fetchFile(name, new File("."))
currentJars(name) = timestamp
// Add it to our class loader
val localName = name.split("/").last
val url = new File(".", localName).toURI.toURL
if (!classLoader.getURLs.contains(url)) {
logInfo("Adding " + url + " to class loader")
classLoader.addURL(url)
}
}
}
}
Expand Down