Skip to content

Commit c5b0c98

Browse files
committed
Addressed comments
1 parent ef05009 commit c5b0c98

File tree

3 files changed

+12
-26
lines changed

3 files changed

+12
-26
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,12 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
237237
}
238238

239239
override def exists(path: Path): Boolean = {
240-
fs.exists(path)
240+
try
241+
return fs.getFileStatus(path) != null
242+
catch {
243+
case e: FileNotFoundException =>
244+
return false
245+
}
241246
}
242247

243248
override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = {
@@ -247,17 +252,14 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
247252
}
248253

249254
if (!fs.rename(srcPath, dstPath)) {
250-
// If overwriteIfPossible = false, then we want to find out why the rename failed and
251-
// try to throw the right error.
255+
// FileSystem.rename() returning false is very ambiguous as it can be for many reasons.
256+
// This tries to make a best effort attempt to return the most appropriate exception.
252257
if (fs.exists(dstPath)) {
253-
// Some implementations of FileSystem may only return false instead of throwing
254-
// FileAlreadyExistsException. In that case, explicitly throw the error the error
255-
// if overwriteIfPossible = false. Note that this is definitely not atomic.
256-
// This is only a best-effort attempt to identify the situation when rename returned
257-
// false.
258258
if (!overwriteIfPossible) {
259-
throw new FileAlreadyExistsException(s"$dstPath already exists")
259+
throw new FileAlreadyExistsException(s"Failed to rename as $dstPath already exists")
260260
}
261+
} else if (!fs.exists(srcPath)) {
262+
throw new FileNotFoundException(s"Failed to rename as $srcPath was not found")
261263
} else {
262264
val msg = s"Failed to rename temp file $srcPath to $dstPath as rename returned false"
263265
logWarning(msg)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -137,22 +137,6 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
137137
}
138138
}
139139

140-
/**
141-
* @return the deserialized metadata in a batch file, or None if file not exist.
142-
* @throws IllegalArgumentException when path does not point to a batch file.
143-
*/
144-
def get(batchFile: Path): Option[T] = {
145-
if (fileManager.exists(batchFile)) {
146-
if (isBatchFile(batchFile)) {
147-
get(pathToBatchId(batchFile))
148-
} else {
149-
throw new IllegalArgumentException(s"File ${batchFile} is not a batch file!")
150-
}
151-
} else {
152-
None
153-
}
154-
}
155-
156140
override def get(batchId: Long): Option[T] = {
157141
val batchMetadataFile = batchIdToPath(batchId)
158142
if (fileManager.exists(batchMetadataFile)) {

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite {
3434

3535
def createManager(path: Path): CheckpointFileManager
3636

37-
test("mkdirs, list, createAtomic, open, delete") {
37+
test("mkdirs, list, createAtomic, open, delete, exists") {
3838
withTempPath { p =>
3939
val basePath = new Path(p.getAbsolutePath)
4040
val fm = createManager(basePath)

0 commit comments

Comments
 (0)