Skip to content

Commit f9965f1

Browse files
committed
More improvements
1 parent f1fc175 commit f9965f1

File tree

3 files changed

+16
-7
lines changed

3 files changed

+16
-7
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,8 +243,8 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
243243
}
244244

245245
try {
246-
if (!fs.rename(srcPath, dstPath)) {
247-
if (fs.exists(dstPath) && !overwriteIfPossible) {
246+
if (!fs.rename(srcPath, dstPath) && !overwriteIfPossible) {
247+
if (fs.exists(dstPath)) {
248248
// Some implementations of FileSystem may not throw FileAlreadyExistsException but
249249
// only return false if file already exists. Explicitly throw the error.
250250
// Note that this is definitely not atomic, so this is only a best-effort attempt

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
148148
// the other used for read+write. We don't want the read-only to delete state files.
149149
if (state == UPDATING) {
150150
state = ABORTED
151-
cancelIfPossible(compressedStream, deltaFileStream)
151+
cancelDeltaFile(compressedStream, deltaFileStream)
152152
} else {
153153
state = ABORTED
154154
}
@@ -403,7 +403,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
403403
output.close()
404404
} catch {
405405
case e: Throwable =>
406-
cancelIfPossible(compressedStream = output, rawStream = rawOutput)
406+
cancelDeltaFile(compressedStream = output, rawStream = rawOutput)
407407
throw e
408408
}
409409
logInfo(s"Written snapshot file for version $version of $this at $targetFile")
@@ -415,7 +415,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
415415
* @param compressedStream the compressed stream.
416416
* @param rawStream the underlying stream which needs to be cancelled.
417417
*/
418-
private def cancelIfPossible(
418+
private def cancelDeltaFile(
419419
compressedStream: DataOutputStream,
420420
rawStream: CancellableFSDataOutputStream): Unit = {
421421
try {

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,25 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite {
5454
assert(fm.list(basePath, acceptAllFilter).exists(_.getPath.getName == "dir"))
5555
assert(fm.list(basePath, rejectAllFilter).length === 0)
5656

57-
// Create atomic and exists
58-
val path = new Path(s"$dir/file")
57+
// Create atomic without overwrite
58+
var path = new Path(s"$dir/file")
5959
assert(!fm.exists(path))
6060
fm.createAtomic(path, overwriteIfPossible = false).cancel()
6161
assert(!fm.exists(path))
6262
fm.createAtomic(path, overwriteIfPossible = false).close()
6363
assert(fm.exists(path))
6464
intercept[IOException] {
65+
// should throw exception since file exists and overwrite is false
6566
fm.createAtomic(path, overwriteIfPossible = false).close()
6667
}
68+
69+
// Create atomic with overwrite if possible
70+
path = new Path(s"$dir/file2")
71+
assert(!fm.exists(path))
72+
fm.createAtomic(path, overwriteIfPossible = true).cancel()
73+
assert(!fm.exists(path))
74+
fm.createAtomic(path, overwriteIfPossible = true).close()
75+
assert(fm.exists(path))
6776
fm.createAtomic(path, overwriteIfPossible = true).close() // should not throw exception
6877

6978
// Open and delete

0 commit comments

Comments
 (0)