Skip to content

[SPARK-4896] don’t redundantly overwrite executor JAR deps #2848

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 107 additions & 63 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -385,16 +385,12 @@ private[spark] object Utils extends Logging {
} finally {
lock.release()
}
if (targetFile.exists && !Files.equal(cachedFile, targetFile)) {
if (conf.getBoolean("spark.files.overwrite", false)) {
targetFile.delete()
logInfo((s"File $targetFile exists and does not match contents of $url, " +
s"replacing it with $url"))
} else {
throw new SparkException(s"File $targetFile exists and does not match contents of $url")
}
}
Files.copy(cachedFile, targetFile)
copyFile(
url,
cachedFile,
targetFile,
conf.getBoolean("spark.files.overwrite", false)
)
} else {
doFetchFile(url, targetDir, fileName, conf, securityMgr, hadoopConf)
}
Expand All @@ -411,6 +407,104 @@ private[spark] object Utils extends Logging {
FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
}

/**
* Download `in` to `tempFile`, then move it to `destFile`.
*
* If `destFile` already exists:
* - no-op if its contents equal those of `sourceFile`,
* - throw an exception if `fileOverwrite` is false,
* - attempt to overwrite it otherwise.
*
* @param url URL that `sourceFile` originated from, for logging purposes.
* @param in InputStream to download.
* @param tempFile File path to download `in` to.
* @param destFile File path to move `tempFile` to.
* @param fileOverwrite Whether to delete/overwrite an existing `destFile` that does not match
* `sourceFile`
*/
private def downloadFile(
url: String,
in: InputStream,
tempFile: File,
destFile: File,
fileOverwrite: Boolean): Unit = {

try {
val out = new FileOutputStream(tempFile)
Utils.copyStream(in, out, closeStreams = true)
copyFile(url, tempFile, destFile, fileOverwrite, removeSourceFile = true)
} finally {
// Catch-all for the couple of cases where for some reason we didn't move `tempFile` to
// `destFile`.
if (tempFile.exists()) {
tempFile.delete()
}
}
}

/**
* Copy `sourceFile` to `destFile`.
*
* If `destFile` already exists:
* - no-op if its contents equal those of `sourceFile`,
* - throw an exception if `fileOverwrite` is false,
* - attempt to overwrite it otherwise.
*
* @param url URL that `sourceFile` originated from, for logging purposes.
* @param sourceFile File path to copy/move from.
* @param destFile File path to copy/move to.
* @param fileOverwrite Whether to delete/overwrite an existing `destFile` that does not match
* `sourceFile`
* @param removeSourceFile Whether to remove `sourceFile` after / as part of moving/copying it to
* `destFile`.
*/
private def copyFile(
url: String,
sourceFile: File,
destFile: File,
fileOverwrite: Boolean,
removeSourceFile: Boolean = false): Unit = {

if (destFile.exists) {
if (!Files.equal(sourceFile, destFile)) {
if (fileOverwrite) {
logInfo(
s"File $destFile exists and does not match contents of $url, replacing it with $url"
)
if (!destFile.delete()) {
throw new SparkException(
"Failed to delete %s while attempting to overwrite it with %s".format(
destFile.getAbsolutePath,
sourceFile.getAbsolutePath
)
)
}
} else {
throw new SparkException(
s"File $destFile exists and does not match contents of $url")
}
} else {
// Do nothing if the file contents are the same, i.e. this file has been copied
// previously.
logInfo(
"%s has been previously copied to %s".format(
sourceFile.getAbsolutePath,
destFile.getAbsolutePath
)
)
return
}
}

// The file does not exist in the target directory. Copy or move it there.
if (removeSourceFile) {
Files.move(sourceFile, destFile)
} else {
logInfo(s"Copying ${sourceFile.getAbsolutePath} to ${destFile.getAbsolutePath}")
Files.copy(sourceFile, destFile)
}
}

/**
* Download a file to target directory. Supports fetching the file in a variety of ways,
* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
Expand Down Expand Up @@ -449,67 +543,17 @@ private[spark] object Utils extends Logging {
uc.setReadTimeout(timeout)
uc.connect()
val in = uc.getInputStream()
val out = new FileOutputStream(tempFile)
Utils.copyStream(in, out, closeStreams = true)
if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
if (fileOverwrite) {
targetFile.delete()
logInfo(("File %s exists and does not match contents of %s, " +
"replacing it with %s").format(targetFile, url, url))
} else {
tempFile.delete()
throw new SparkException(
"File " + targetFile + " exists and does not match contents of" + " " + url)
}
}
Files.move(tempFile, targetFile)
downloadFile(url, in, tempFile, targetFile, fileOverwrite)
case "file" =>
// In the case of a local file, copy the local file to the target directory.
// Note the difference between uri vs url.
val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
var shouldCopy = true
if (targetFile.exists) {
if (!Files.equal(sourceFile, targetFile)) {
if (fileOverwrite) {
targetFile.delete()
logInfo(("File %s exists and does not match contents of %s, " +
"replacing it with %s").format(targetFile, url, url))
} else {
throw new SparkException(
"File " + targetFile + " exists and does not match contents of" + " " + url)
}
} else {
// Do nothing if the file contents are the same, i.e. this file has been copied
// previously.
logInfo(sourceFile.getAbsolutePath + " has been previously copied to "
+ targetFile.getAbsolutePath)
shouldCopy = false
}
}

if (shouldCopy) {
// The file does not exist in the target directory. Copy it there.
logInfo("Copying " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
Files.copy(sourceFile, targetFile)
}
copyFile(url, sourceFile, targetFile, fileOverwrite)
case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
val fs = getHadoopFileSystem(uri, hadoopConf)
val in = fs.open(new Path(uri))
val out = new FileOutputStream(tempFile)
Utils.copyStream(in, out, closeStreams = true)
if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
if (fileOverwrite) {
targetFile.delete()
logInfo(("File %s exists and does not match contents of %s, " +
"replacing it with %s").format(targetFile, url, url))
} else {
tempFile.delete()
throw new SparkException(
"File " + targetFile + " exists and does not match contents of" + " " + url)
}
}
Files.move(tempFile, targetFile)
downloadFile(url, in, tempFile, targetFile, fileOverwrite)
}
}

Expand Down