Skip to content

Commit fd833e7

Browse files
committed
Allow files added through SparkContext.addFile() to be overwritten
This is useful for the cases when a file needs to be refreshed and downloaded by the executors periodically. Signed-off-by: Yinan Li <liyinan926@gmail.com>
1 parent aa981e4 commit fd833e7

File tree

2 files changed

+42
-15
lines changed

2 files changed

+42
-15
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -268,54 +268,73 @@ private[spark] object Utils extends Logging {
268268
val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir))
269269
val targetFile = new File(targetDir, filename)
270270
val uri = new URI(url)
271+
val fileOverwrite = System.getProperty("spark.files.overwrite", "false").toBoolean
271272
uri.getScheme match {
272273
case "http" | "https" | "ftp" =>
273274
logInfo("Fetching " + url + " to " + tempFile)
274275
val in = new URL(url).openStream()
275276
val out = new FileOutputStream(tempFile)
276277
Utils.copyStream(in, out, true)
277278
if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
278-
tempFile.delete()
279-
throw new SparkException(
280-
"File " + targetFile + " exists and does not match contents of" + " " + url)
281-
} else {
282-
Files.move(tempFile, targetFile)
279+
if (fileOverwrite) {
280+
targetFile.delete()
281+
logInfo(("File %s exists and does not match contents of %s, " +
282+
"replacing it with %s").format(targetFile, url, url))
283+
} else {
284+
tempFile.delete()
285+
throw new SparkException(
286+
"File " + targetFile + " exists and does not match contents of" + " " + url)
287+
}
283288
}
289+
Files.move(tempFile, targetFile)
284290
case "file" | null =>
285291
// In the case of a local file, copy the local file to the target directory.
286292
// Note the difference between uri vs url.
287293
val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
294+
var shouldCopy = true
288295
if (targetFile.exists) {
289-
// If the target file already exists, warn the user if
290296
if (!Files.equal(sourceFile, targetFile)) {
291-
throw new SparkException(
292-
"File " + targetFile + " exists and does not match contents of" + " " + url)
297+
if (fileOverwrite) {
298+
targetFile.delete()
299+
logInfo(("File %s exists and does not match contents of %s, " +
300+
"replacing it with %s").format(targetFile, url, url))
301+
} else {
302+
throw new SparkException(
303+
"File " + targetFile + " exists and does not match contents of" + " " + url)
304+
}
293305
} else {
294306
// Do nothing if the file contents are the same, i.e. this file has been copied
295307
// previously.
296308
logInfo(sourceFile.getAbsolutePath + " has been previously copied to "
297309
+ targetFile.getAbsolutePath)
310+
shouldCopy = false
298311
}
299-
} else {
312+
}
313+
314+
if (shouldCopy) {
300315
// The file does not exist in the target directory. Copy it there.
301316
logInfo("Copying " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
302317
Files.copy(sourceFile, targetFile)
303318
}
304319
case _ =>
305320
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
306-
val uri = new URI(url)
307321
val conf = SparkHadoopUtil.get.newConfiguration()
308322
val fs = FileSystem.get(uri, conf)
309323
val in = fs.open(new Path(uri))
310324
val out = new FileOutputStream(tempFile)
311325
Utils.copyStream(in, out, true)
312326
if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
313-
tempFile.delete()
314-
throw new SparkException("File " + targetFile + " exists and does not match contents of" +
315-
" " + url)
316-
} else {
317-
Files.move(tempFile, targetFile)
327+
if (fileOverwrite) {
328+
targetFile.delete()
329+
logInfo(("File %s exists and does not match contents of %s, " +
330+
"replacing it with %s").format(targetFile, url, url))
331+
} else {
332+
tempFile.delete()
333+
throw new SparkException(
334+
"File " + targetFile + " exists and does not match contents of" + " " + url)
335+
}
318336
}
337+
Files.move(tempFile, targetFile)
319338
}
320339
// Decompress the file if it's a .tar or .tar.gz
321340
if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) {

docs/configuration.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,7 @@ Apart from these, the following properties are also available, and may be useful
431431
</td>
432432
</tr>
433433
<tr>
434+
<<<<<<< HEAD
434435
<td>spark.logConf</td>
435436
<td>false</td>
436437
<td>
@@ -459,6 +460,13 @@ Apart from these, the following properties are also available, and may be useful
459460
the whole cluster by default. <br/>
460461
<b>Note:</b> this setting needs to be configured in the standalone cluster master, not in individual
461462
applications; you can set it through <code>SPARK_JAVA_OPTS</code> in <code>spark-env.sh</code>.
463+
</td>
464+
</tr>
465+
<tr>
466+
<td>spark.files.overwrite</td>
467+
<td>false</td>
468+
<td>
469+
Whether to overwrite files added through SparkContext.addFile() when the target file exists and its contents do not match those of the source.
462470
</td>
463471
</tr>
464472
</table>

0 commit comments

Comments
 (0)