@@ -385,16 +385,12 @@ private[spark] object Utils extends Logging {
385
385
} finally {
386
386
lock.release()
387
387
}
388
- if (targetFile.exists && ! Files .equal(cachedFile, targetFile)) {
389
- if (conf.getBoolean(" spark.files.overwrite" , false )) {
390
- targetFile.delete()
391
- logInfo((s " File $targetFile exists and does not match contents of $url, " +
392
- s " replacing it with $url" ))
393
- } else {
394
- throw new SparkException (s " File $targetFile exists and does not match contents of $url" )
395
- }
396
- }
397
- Files .copy(cachedFile, targetFile)
388
+ copyFile(
389
+ url,
390
+ cachedFile,
391
+ targetFile,
392
+ conf.getBoolean(" spark.files.overwrite" , false )
393
+ )
398
394
} else {
399
395
doFetchFile(url, targetDir, fileName, conf, securityMgr, hadoopConf)
400
396
}
@@ -411,6 +407,104 @@ private[spark] object Utils extends Logging {
411
407
FileUtil .chmod(targetFile.getAbsolutePath, " a+x" )
412
408
}
413
409
410
+ /**
411
+ * Download `in` to `tempFile`, then move it to `destFile`.
412
+ *
413
+ * If `destFile` already exists:
414
+ * - no-op if its contents equal those of `sourceFile`,
415
+ * - throw an exception if `fileOverwrite` is false,
416
+ * - attempt to overwrite it otherwise.
417
+ *
418
+ * @param url URL that `sourceFile` originated from, for logging purposes.
419
+ * @param in InputStream to download.
420
+ * @param tempFile File path to download `in` to.
421
+ * @param destFile File path to move `tempFile` to.
422
+ * @param fileOverwrite Whether to delete/overwrite an existing `destFile` that does not match
423
+ * `sourceFile`
424
+ */
425
+ private def downloadFile (
426
+ url : String ,
427
+ in : InputStream ,
428
+ tempFile : File ,
429
+ destFile : File ,
430
+ fileOverwrite : Boolean ): Unit = {
431
+
432
+ try {
433
+ val out = new FileOutputStream (tempFile)
434
+ Utils .copyStream(in, out, closeStreams = true )
435
+ copyFile(url, tempFile, destFile, fileOverwrite, removeSourceFile = true )
436
+ } finally {
437
+ // Catch-all for the couple of cases where for some reason we didn't move `tempFile` to
438
+ // `destFile`.
439
+ if (tempFile.exists()) {
440
+ tempFile.delete()
441
+ }
442
+ }
443
+ }
444
+
445
+ /**
446
+ * Copy `sourceFile` to `destFile`.
447
+ *
448
+ * If `destFile` already exists:
449
+ * - no-op if its contents equal those of `sourceFile`,
450
+ * - throw an exception if `fileOverwrite` is false,
451
+ * - attempt to overwrite it otherwise.
452
+ *
453
+ * @param url URL that `sourceFile` originated from, for logging purposes.
454
+ * @param sourceFile File path to copy/move from.
455
+ * @param destFile File path to copy/move to.
456
+ * @param fileOverwrite Whether to delete/overwrite an existing `destFile` that does not match
457
+ * `sourceFile`
458
+ * @param removeSourceFile Whether to remove `sourceFile` after / as part of moving/copying it to
459
+ * `destFile`.
460
+ */
461
+ private def copyFile (
462
+ url : String ,
463
+ sourceFile : File ,
464
+ destFile : File ,
465
+ fileOverwrite : Boolean ,
466
+ removeSourceFile : Boolean = false ): Unit = {
467
+
468
+ if (destFile.exists) {
469
+ if (! Files .equal(sourceFile, destFile)) {
470
+ if (fileOverwrite) {
471
+ logInfo(
472
+ s " File $destFile exists and does not match contents of $url, replacing it with $url"
473
+ )
474
+ if (! destFile.delete()) {
475
+ throw new SparkException (
476
+ " Failed to delete %s while attempting to overwrite it with %s" .format(
477
+ destFile.getAbsolutePath,
478
+ sourceFile.getAbsolutePath
479
+ )
480
+ )
481
+ }
482
+ } else {
483
+ throw new SparkException (
484
+ s " File $destFile exists and does not match contents of $url" )
485
+ }
486
+ } else {
487
+ // Do nothing if the file contents are the same, i.e. this file has been copied
488
+ // previously.
489
+ logInfo(
490
+ " %s has been previously copied to %s" .format(
491
+ sourceFile.getAbsolutePath,
492
+ destFile.getAbsolutePath
493
+ )
494
+ )
495
+ return
496
+ }
497
+ }
498
+
499
+ // The file does not exist in the target directory. Copy or move it there.
500
+ if (removeSourceFile) {
501
+ Files .move(sourceFile, destFile)
502
+ } else {
503
+ logInfo(s " Copying ${sourceFile.getAbsolutePath} to ${destFile.getAbsolutePath}" )
504
+ Files .copy(sourceFile, destFile)
505
+ }
506
+ }
507
+
414
508
/**
415
509
* Download a file to target directory. Supports fetching the file in a variety of ways,
416
510
* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
@@ -449,67 +543,17 @@ private[spark] object Utils extends Logging {
449
543
uc.setReadTimeout(timeout)
450
544
uc.connect()
451
545
val in = uc.getInputStream()
452
- val out = new FileOutputStream (tempFile)
453
- Utils .copyStream(in, out, closeStreams = true )
454
- if (targetFile.exists && ! Files .equal(tempFile, targetFile)) {
455
- if (fileOverwrite) {
456
- targetFile.delete()
457
- logInfo((" File %s exists and does not match contents of %s, " +
458
- " replacing it with %s" ).format(targetFile, url, url))
459
- } else {
460
- tempFile.delete()
461
- throw new SparkException (
462
- " File " + targetFile + " exists and does not match contents of" + " " + url)
463
- }
464
- }
465
- Files .move(tempFile, targetFile)
546
+ downloadFile(url, in, tempFile, targetFile, fileOverwrite)
466
547
case " file" =>
467
548
// In the case of a local file, copy the local file to the target directory.
468
549
// Note the difference between uri vs url.
469
550
val sourceFile = if (uri.isAbsolute) new File (uri) else new File (url)
470
- var shouldCopy = true
471
- if (targetFile.exists) {
472
- if (! Files .equal(sourceFile, targetFile)) {
473
- if (fileOverwrite) {
474
- targetFile.delete()
475
- logInfo((" File %s exists and does not match contents of %s, " +
476
- " replacing it with %s" ).format(targetFile, url, url))
477
- } else {
478
- throw new SparkException (
479
- " File " + targetFile + " exists and does not match contents of" + " " + url)
480
- }
481
- } else {
482
- // Do nothing if the file contents are the same, i.e. this file has been copied
483
- // previously.
484
- logInfo(sourceFile.getAbsolutePath + " has been previously copied to "
485
- + targetFile.getAbsolutePath)
486
- shouldCopy = false
487
- }
488
- }
489
-
490
- if (shouldCopy) {
491
- // The file does not exist in the target directory. Copy it there.
492
- logInfo(" Copying " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
493
- Files .copy(sourceFile, targetFile)
494
- }
551
+ copyFile(url, sourceFile, targetFile, fileOverwrite)
495
552
case _ =>
496
553
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
497
554
val fs = getHadoopFileSystem(uri, hadoopConf)
498
555
val in = fs.open(new Path (uri))
499
- val out = new FileOutputStream (tempFile)
500
- Utils .copyStream(in, out, closeStreams = true )
501
- if (targetFile.exists && ! Files .equal(tempFile, targetFile)) {
502
- if (fileOverwrite) {
503
- targetFile.delete()
504
- logInfo((" File %s exists and does not match contents of %s, " +
505
- " replacing it with %s" ).format(targetFile, url, url))
506
- } else {
507
- tempFile.delete()
508
- throw new SparkException (
509
- " File " + targetFile + " exists and does not match contents of" + " " + url)
510
- }
511
- }
512
- Files .move(tempFile, targetFile)
556
+ downloadFile(url, in, tempFile, targetFile, fileOverwrite)
513
557
}
514
558
}
515
559
0 commit comments