Skip to content

Commit f9fc77f

Browse files
committed
Josh's comments
1 parent 70cd24d commit f9fc77f

File tree

3 files changed

+26
-34
lines changed

3 files changed

+26
-34
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1018,7 +1018,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
10181018
* supported for Hadoop-supported filesystems.
10191019
*/
10201020
def addFile(path: String, recursive: Boolean): Unit = {
1021-
val isLocalMode = conf.get("spark.master").startsWith("local")
10221021
val uri = new URI(path)
10231022
val schemeCorrectedPath = uri.getScheme match {
10241023
case null | "local" => "file:" + uri.getPath
@@ -1030,10 +1029,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
10301029
if (!Array("http", "https", "ftp").contains(scheme)) {
10311030
val fs = hadoopPath.getFileSystem(hadoopConfiguration)
10321031
if (!fs.exists(hadoopPath)) {
1033-
throw new SparkException(s"Added file $hadoopPath does not exist.")
1032+
throw new FileNotFoundException(s"Added file $hadoopPath does not exist.")
10341033
}
10351034
val isDir = fs.isDirectory(hadoopPath)
1036-
if (!isLocalMode && scheme == "file" && isDir) {
1035+
if (!isLocal && scheme == "file" && isDir) {
10371036
throw new SparkException(s"addFile does not support local directories when not running " +
10381037
"local mode.")
10391038
}
@@ -1043,7 +1042,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
10431042
}
10441043
}
10451044

1046-
val key = if (!isLocalMode && scheme == "file") {
1045+
val key = if (!isLocal && scheme == "file") {
10471046
env.httpFileServer.addFile(new File(uri.getPath))
10481047
} else {
10491048
schemeCorrectedPath

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,7 @@ private[spark] object Utils extends Logging {
442442
fileOverwrite: Boolean): Unit = {
443443
val tempFile = File.createTempFile("fetchFileTemp", null,
444444
new File(destFile.getParentFile.getAbsolutePath))
445-
logInfo("Fetching " + url + " to " + tempFile)
445+
logInfo(s"Fetching $url to $tempFile")
446446

447447
try {
448448
val out = new FileOutputStream(tempFile)
@@ -527,9 +527,9 @@ private[spark] object Utils extends Logging {
527527
if (subfiles1.size != subfiles2.size) {
528528
return false
529529
}
530-
subfiles1.sortBy(_.getName).zip(subfiles2.sortBy(_.getName)).dropWhile {
530+
subfiles1.sortBy(_.getName).zip(subfiles2.sortBy(_.getName)).forall {
531531
case (f1, f2) => filesEqualRecursive(f1, f2)
532-
}.isEmpty
532+
}
533533
} else if (file1.isFile && file2.isFile) {
534534
Files.equal(file1, file2)
535535
} else {
@@ -611,7 +611,9 @@ private[spark] object Utils extends Logging {
611611
conf: SparkConf,
612612
hadoopConf: Configuration,
613613
fileOverwrite: Boolean): Unit = {
614-
targetDir.mkdir()
614+
if (!targetDir.mkdir()) {
615+
throw new IOException(s"Failed to create directory ${targetDir.getPath}")
616+
}
615617
fs.listStatus(path).foreach { fileStatus =>
616618
val innerPath = fileStatus.getPath
617619
if (fileStatus.isDir) {

core/src/test/scala/org/apache/spark/SparkContextSuite.scala

Lines changed: 17 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import org.scalatest.FunSuite
2626

2727
import org.apache.hadoop.io.BytesWritable
2828

29+
import org.apache.spark.util.Utils
30+
2931
class SparkContextSuite extends FunSuite with LocalSparkContext {
3032

3133
test("Only one SparkContext may be active at a time") {
@@ -79,7 +81,7 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
7981
}
8082

8183
test("addFile works") {
82-
val file = new File("somefile")
84+
val file = File.createTempFile("someprefix", "somesuffix")
8385
val absolutePath = file.getAbsolutePath
8486
try {
8587
Files.write("somewords", file, UTF_8)
@@ -102,60 +104,49 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
102104
}).count()
103105
} finally {
104106
sc.stop()
105-
file.delete()
106107
}
107108
}
108109

109110
test("addFile recursive works") {
110-
val pluto = new File("pluto")
111-
val neptune = new File(pluto, "neptune")
112-
val saturn = new File(neptune, "saturn")
113-
val alien1 = new File(neptune, "alien1")
114-
val alien2 = new File(saturn, "alien2")
111+
val pluto = Utils.createTempDir()
112+
val neptune = Utils.createTempDir(pluto.getAbsolutePath)
113+
val saturn = Utils.createTempDir(neptune.getAbsolutePath)
114+
val alien1 = File.createTempFile("alien", "1", neptune)
115+
val alien2 = File.createTempFile("alien", "2", saturn)
115116

116117
try {
117-
assert(neptune.mkdirs())
118-
assert(saturn.mkdir())
119-
assert(alien1.createNewFile())
120-
assert(alien2.createNewFile())
121-
122118
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
123119
sc.addFile(neptune.getAbsolutePath, true)
124120
sc.parallelize(Array(1), 1).map(x => {
125121
val sep = File.separator
126-
if (!new File(SparkFiles.get("neptune" + sep + "alien1")).exists()) {
122+
if (!new File(SparkFiles.get(neptune.getName + sep + alien1.getName)).exists()) {
127123
throw new SparkException("can't access file under root added directory")
128124
}
129-
if (!new File(SparkFiles.get("neptune" + sep + "saturn" + sep + "alien2")).exists()) {
125+
if (!new File(SparkFiles.get(neptune.getName + sep + saturn.getName + sep + alien2.getName))
126+
.exists()) {
130127
throw new SparkException("can't access file in nested directory")
131128
}
132-
if (new File(SparkFiles.get("pluto" + sep + "neptune" + sep + "alien1")).exists()) {
129+
if (new File(SparkFiles.get(pluto.getName + sep + neptune.getName + sep + alien1.getName))
130+
.exists()) {
133131
throw new SparkException("file exists that shouldn't")
134132
}
135133
x
136134
}).count()
137135
} finally {
138136
sc.stop()
139-
alien2.delete()
140-
saturn.delete()
141-
alien1.delete()
142-
neptune.delete()
143-
pluto.delete()
144137
}
145138
}
146139

147140
test("addFile recursive can't add directories by default") {
148-
val dir = new File("dir")
141+
val dir = Utils.createTempDir()
149142

150143
try {
151144
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
152-
sc.addFile(dir.getAbsolutePath)
153-
assert(false, "should have thrown exception")
154-
} catch {
155-
case _: SparkException =>
145+
intercept[SparkException] {
146+
sc.addFile(dir.getAbsolutePath)
147+
}
156148
} finally {
157149
sc.stop()
158-
dir.delete()
159150
}
160151
}
161152
}

0 commit comments

Comments
 (0)