Skip to content

Commit 31f15a9

Browse files
committed
Use cache recursively and fix some compile errors
1 parent 0239c3d commit 31f15a9

File tree

3 files changed

+32
-4
lines changed

3 files changed

+32
-4
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ private[spark] class Executor(
171171
startGCTime = gcTime
172172

173173
try {
174-
val (taskFiles, taskJars, taskDirs, taskBytes) =
174+
val (taskFiles, taskJars, taskBytes) =
175175
Task.deserializeWithDependencies(serializedTask)
176176
updateDependencies(taskFiles, taskJars)
177177
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ private[spark] object Utils extends Logging {
481481
removeSourceFile: Boolean = false): Unit = {
482482

483483
if (destFile.exists) {
484-
if (!Files.equal(sourceFile, destFile)) {
484+
if (!filesEqualRecursive(sourceFile, destFile)) {
485485
if (fileOverwrite) {
486486
logInfo(
487487
s"File $destFile exists and does not match contents of $url, replacing it with $url"
@@ -516,7 +516,34 @@ private[spark] object Utils extends Logging {
516516
Files.move(sourceFile, destFile)
517517
} else {
518518
logInfo(s"Copying ${sourceFile.getAbsolutePath} to ${destFile.getAbsolutePath}")
519-
Files.copy(sourceFile, destFile)
519+
copyRecursive(sourceFile, destFile)
520+
}
521+
}
522+
523+
private def filesEqualRecursive(file1: File, file2: File): Boolean = {
524+
if (file1.isDirectory && file2.isDirectory) {
525+
val subfiles1 = file1.listFiles()
526+
val subfiles2 = file2.listFiles()
527+
if (subfiles1.size != subfiles2.size) {
528+
return false
529+
}
530+
subfiles1.sortBy(_.getName).zip(subfiles2.sortBy(_.getName)).dropWhile {
531+
case (f1, f2) => filesEqualRecursive(f1, f2)
532+
}.isEmpty
533+
} else if (file1.isFile && file2.isFile) {
534+
Files.equal(file1, file2)
535+
} else {
536+
false
537+
}
538+
}
539+
540+
private def copyRecursive(source: File, dest: File): Unit = {
541+
if (source.isDirectory) {
542+
dest.mkdir()
543+
val subfiles = source.listFiles()
544+
subfiles.foreach(f => copyRecursive(f, new File(dest, f.getName)))
545+
} else {
546+
Files.copy(source, dest)
520547
}
521548
}
522549

core/src/test/scala/org/apache/spark/SparkContextSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717

1818
package org.apache.spark
1919

20+
import java.io.File
21+
2022
import org.scalatest.FunSuite
2123

2224
import org.apache.hadoop.io.BytesWritable
23-
import java.io.File
2425

2526
class SparkContextSuite extends FunSuite with LocalSparkContext {
2627

0 commit comments

Comments
 (0)