Skip to content

Commit 5771f54

Browse files
authored
Merge pull request apache#121 from mapr/mapr-25770-revert
Revert MAPR-25770
2 parents 99daf6b + 8e3b9ed commit 5771f54

File tree

13 files changed

+92
-109
lines changed

13 files changed

+92
-109
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1450,9 +1450,6 @@ class SparkContext(config: SparkConf) extends Logging {
14501450
val scheme = new URI(schemeCorrectedPath).getScheme
14511451
if (!Array("http", "https", "ftp").contains(scheme)) {
14521452
val fs = hadoopPath.getFileSystem(hadoopConfiguration)
1453-
if (!fs.exists(hadoopPath)) {
1454-
throw new FileNotFoundException(s"Added file $hadoopPath does not exist.")
1455-
}
14561453
val isDir = fs.getFileStatus(hadoopPath).isDirectory
14571454
if (!isLocal && scheme == "file" && isDir) {
14581455
throw new SparkException(s"addFile does not support local directories when not running " +

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -193,16 +193,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
193193
private def startPolling(): Unit = {
194194
// Validate the log directory.
195195
val path = new Path(logDir)
196-
if (!fs.exists(path)) {
197-
var msg = s"Log directory specified does not exist: $logDir"
198-
if (logDir == DEFAULT_LOG_DIR) {
199-
msg += " Did you configure the correct one through spark.history.fs.logDirectory?"
196+
try {
197+
if (!fs.getFileStatus(path).isDirectory) {
198+
throw new IllegalArgumentException(
199+
"Logging directory specified is not a directory: %s".format(logDir))
200200
}
201-
throw new IllegalArgumentException(msg)
202-
}
203-
if (!fs.getFileStatus(path).isDirectory) {
204-
throw new IllegalArgumentException(
205-
"Logging directory specified is not a directory: %s".format(logDir))
201+
} catch {
202+
case f: FileNotFoundException =>
203+
var msg = s"Log directory specified does not exist: $logDir"
204+
if (logDir == DEFAULT_LOG_DIR) {
205+
msg += " Did you configure the correct one through spark.history.fs.logDirectory?"
206+
}
207+
throw new FileNotFoundException(msg).initCause(f)
206208
}
207209

208210
// Disable the background thread during tests.
@@ -566,12 +568,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
566568
val leftToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
567569
attemptsToClean.foreach { attempt =>
568570
try {
569-
val path = new Path(logDir, attempt.logPath)
570-
if (fs.exists(path)) {
571-
if (!fs.delete(path, true)) {
572-
logWarning(s"Error deleting ${path}")
573-
}
574-
}
571+
fs.delete(new Path(logDir, attempt.logPath), true)
575572
} catch {
576573
case e: AccessControlException =>
577574
logInfo(s"No permission to delete ${attempt.logPath}, ignoring.")

core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.rdd
1919

20-
import java.io.IOException
20+
import java.io.{FileNotFoundException, IOException}
2121

2222
import scala.reflect.ClassTag
2323
import scala.util.control.NonFatal
@@ -166,9 +166,6 @@ private[spark] object ReliableCheckpointRDD extends Logging {
166166
val tempOutputPath =
167167
new Path(outputDir, s".$finalOutputName-attempt-${ctx.attemptNumber()}")
168168

169-
if (fs.exists(tempOutputPath)) {
170-
throw new IOException(s"Checkpoint failed: temporary path $tempOutputPath already exists")
171-
}
172169
val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
173170

174171
val fileOutputStream = if (blockSize < 0) {
@@ -240,22 +237,20 @@ private[spark] object ReliableCheckpointRDD extends Logging {
240237
val bufferSize = sc.conf.getInt("spark.buffer.size", 65536)
241238
val partitionerFilePath = new Path(checkpointDirPath, checkpointPartitionerFileName)
242239
val fs = partitionerFilePath.getFileSystem(sc.hadoopConfiguration)
243-
if (fs.exists(partitionerFilePath)) {
244-
val fileInputStream = fs.open(partitionerFilePath, bufferSize)
245-
val serializer = SparkEnv.get.serializer.newInstance()
246-
val deserializeStream = serializer.deserializeStream(fileInputStream)
247-
val partitioner = Utils.tryWithSafeFinally[Partitioner] {
248-
deserializeStream.readObject[Partitioner]
249-
} {
250-
deserializeStream.close()
251-
}
252-
logDebug(s"Read partitioner from $partitionerFilePath")
253-
Some(partitioner)
254-
} else {
255-
logDebug("No partitioner file")
256-
None
240+
val fileInputStream = fs.open(partitionerFilePath, bufferSize)
241+
val serializer = SparkEnv.get.serializer.newInstance()
242+
val deserializeStream = serializer.deserializeStream(fileInputStream)
243+
val partitioner = Utils.tryWithSafeFinally[Partitioner] {
244+
deserializeStream.readObject[Partitioner]
245+
} {
246+
deserializeStream.close()
257247
}
248+
logDebug(s"Read partitioner from $partitionerFilePath")
249+
Some(partitioner)
258250
} catch {
251+
case e: FileNotFoundException =>
252+
logDebug("No partitioner file", e)
253+
None
259254
case NonFatal(e) =>
260255
logWarning(s"Error reading partitioner from $checkpointDirPath, " +
261256
s"partitioner will not be recovered which may lead to performance loss", e)

core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,7 @@ private[spark] object ReliableRDDCheckpointData extends Logging {
8080
/** Clean up the files associated with the checkpoint data for this RDD. */
8181
def cleanCheckpoint(sc: SparkContext, rddId: Int): Unit = {
8282
checkpointPath(sc, rddId).foreach { path =>
83-
val fs = path.getFileSystem(sc.hadoopConfiguration)
84-
if (fs.exists(path)) {
85-
if (!fs.delete(path, true)) {
86-
logWarning(s"Error deleting ${path.toString()}")
87-
}
88-
}
83+
path.getFileSystem(sc.hadoopConfiguration).delete(path, true)
8984
}
9085
}
9186
}

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ private[spark] class EventLoggingListener(
9191
*/
9292
def start() {
9393
if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
94-
throw new IllegalArgumentException(s"Log directory $logBaseDir does not exist.")
94+
throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.")
9595
}
9696

9797
val workingPath = logPath + IN_PROGRESS
@@ -100,11 +100,8 @@ private[spark] class EventLoggingListener(
100100
val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
101101
val isDefaultLocal = defaultFs == null || defaultFs == "file"
102102

103-
if (shouldOverwrite && fileSystem.exists(path)) {
103+
if (shouldOverwrite && fileSystem.delete(path, true)) {
104104
logWarning(s"Event log $path already exists. Overwriting...")
105-
if (!fileSystem.delete(path, true)) {
106-
logWarning(s"Error deleting $path")
107-
}
108105
}
109106

110107
/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
@@ -301,12 +298,6 @@ private[spark] object EventLoggingListener extends Logging {
301298
* @return input stream that holds one JSON record per line.
302299
*/
303300
def openEventLog(log: Path, fs: FileSystem): InputStream = {
304-
// It's not clear whether FileSystem.open() throws FileNotFoundException or just plain
305-
// IOException when a file does not exist, so try our best to throw a proper exception.
306-
if (!fs.exists(log)) {
307-
throw new FileNotFoundException(s"File $log does not exist.")
308-
}
309-
310301
val in = new BufferedInputStream(fs.open(log))
311302

312303
try {

repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.repl
1919

20-
import java.io.{ByteArrayOutputStream, FilterInputStream, InputStream, IOException}
20+
import java.io.{ByteArrayOutputStream, FileNotFoundException, FilterInputStream, InputStream, IOException}
2121
import java.net.{HttpURLConnection, URI, URL, URLEncoder}
2222
import java.nio.channels.Channels
2323

@@ -147,10 +147,11 @@ class ExecutorClassLoader(
147147
private def getClassFileInputStreamFromFileSystem(fileSystem: FileSystem)(
148148
pathInDirectory: String): InputStream = {
149149
val path = new Path(directory, pathInDirectory)
150-
if (fileSystem.exists(path)) {
150+
try {
151151
fileSystem.open(path)
152-
} else {
153-
throw new ClassNotFoundException(s"Class file not found at path $path")
152+
} catch {
153+
case _: FileNotFoundException =>
154+
throw new ClassNotFoundException(s"Class file not found at path $path")
154155
}
155156
}
156157

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,9 +191,8 @@ private[spark] class Client(
191191
try {
192192
val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
193193
val fs = stagingDirPath.getFileSystem(hadoopConf)
194-
if (!preserveFiles && fs.exists(stagingDirPath)) {
195-
logInfo("Deleting staging directory " + stagingDirPath)
196-
fs.delete(stagingDirPath, true)
194+
if (!preserveFiles && fs.delete(stagingDirPath, true)) {
195+
logInfo(s"Deleted staging directory $stagingDirPath")
197196
}
198197
} catch {
199198
case ioe: IOException =>

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.execution.streaming.state
1919

20-
import java.io.{DataInputStream, DataOutputStream, IOException}
20+
import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, IOException}
2121

2222
import scala.collection.JavaConverters._
2323
import scala.collection.mutable
@@ -172,7 +172,7 @@ private[state] class HDFSBackedStateStoreProvider(
172172
if (tempDeltaFileStream != null) {
173173
tempDeltaFileStream.close()
174174
}
175-
if (tempDeltaFile != null && fs.exists(tempDeltaFile)) {
175+
if (tempDeltaFile != null) {
176176
fs.delete(tempDeltaFile, true)
177177
}
178178
logInfo(s"Aborted version $newVersion for $this")
@@ -287,14 +287,12 @@ private[state] class HDFSBackedStateStoreProvider(
287287

288288
/** Initialize the store provider */
289289
private def initialize(): Unit = {
290-
if (!fs.exists(baseDir)) {
290+
try {
291291
fs.mkdirs(baseDir)
292-
} else {
293-
if (!fs.isDirectory(baseDir)) {
292+
} catch {
293+
case e: IOException =>
294294
throw new IllegalStateException(
295-
s"Cannot use ${id.checkpointLocation} for storing state data for $this as " +
296-
s"$baseDir already exists and is not a directory")
297-
}
295+
s"Cannot use ${id.checkpointLocation} for storing state data for $this: $e ", e)
298296
}
299297
}
300298

@@ -348,13 +346,16 @@ private[state] class HDFSBackedStateStoreProvider(
348346

349347
private def updateFromDeltaFile(version: Long, map: MapType): Unit = {
350348
val fileToRead = deltaFile(version)
351-
if (!fs.exists(fileToRead)) {
352-
throw new IllegalStateException(
353-
s"Error reading delta file $fileToRead of $this: $fileToRead does not exist")
354-
}
355349
var input: DataInputStream = null
350+
val sourceStream = try {
351+
fs.open(fileToRead)
352+
} catch {
353+
case f: FileNotFoundException =>
354+
throw new IllegalStateException(
355+
s"Error reading delta file $fileToRead of $this: $fileToRead does not exist", f)
356+
}
356357
try {
357-
input = decompressStream(fs.open(fileToRead))
358+
input = decompressStream(sourceStream)
358359
var eof = false
359360

360361
while(!eof) {
@@ -413,8 +414,6 @@ private[state] class HDFSBackedStateStoreProvider(
413414

414415
private def readSnapshotFile(version: Long): Option[MapType] = {
415416
val fileToRead = snapshotFile(version)
416-
if (!fs.exists(fileToRead)) return None
417-
418417
val map = new MapType()
419418
var input: DataInputStream = null
420419

@@ -451,6 +450,9 @@ private[state] class HDFSBackedStateStoreProvider(
451450
}
452451
logInfo(s"Read snapshot file for version $version of $this from $fileToRead")
453452
Some(map)
453+
} catch {
454+
case _: FileNotFoundException =>
455+
None
454456
} finally {
455457
if (input != null) input.close()
456458
}

sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,7 @@ public void setUp() throws IOException {
7575
hiveManagedPath = new Path(
7676
catalog.hiveDefaultTableFilePath(new TableIdentifier("javaSavedTable")));
7777
fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration());
78-
if (fs.exists(hiveManagedPath)){
79-
fs.delete(hiveManagedPath, true);
80-
}
78+
fs.delete(hiveManagedPath, true);
8179

8280
List<String> jsonObjects = new ArrayList<>(10);
8381
for (int i = 0; i < 10; i++) {

sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
382382
sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable"))
383383
val filesystemPath = new Path(expectedPath)
384384
val fs = filesystemPath.getFileSystem(spark.sessionState.newHadoopConf())
385-
if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true)
385+
fs.delete(filesystemPath, true)
386386

387387
// It is a managed table when we do not specify the location.
388388
sql(

0 commit comments

Comments
 (0)