Skip to content

Commit f1fc175

Browse files
committed
Reduced the public interface further and avoid file deletes in rename
1 parent df7b339 commit f1fc175

File tree

4 files changed

+88
-131
lines changed

4 files changed

+88
-131
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala

Lines changed: 78 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
2828
import org.apache.hadoop.fs.permission.FsPermission
2929

3030
import org.apache.spark.internal.Logging
31+
import org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
3132
import org.apache.spark.sql.internal.SQLConf
3233
import org.apache.spark.util.Utils
3334

@@ -49,6 +50,21 @@ trait CheckpointFileManager {
4950

5051
import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
5152

53+
/**
54+
* Create a file and make its contents available atomically after the output stream is closed.
55+
*
56+
* @param path Path to create
57+
* @param overwriteIfPossible If true, then the implementations must do a best-effort attempt to
58+
* overwrite the file if it already exists. It should not throw
59+
* any exception if the file exists. However, if false, then the
60+
* implementation must not overwrite if the file alraedy exists and
61+
* must throw `FileAlreadyExistsException` in that case.
62+
*/
63+
def createAtomic(path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream
64+
65+
/** Open a file for reading, or throw exception if it does not exist. */
66+
def open(path: Path): FSDataInputStream
67+
5268
/** List the files in a path that match a filter. */
5369
def list(path: Path, filter: PathFilter): Array[FileStatus]
5470

@@ -63,33 +79,37 @@ trait CheckpointFileManager {
6379
/** Whether path exists */
6480
def exists(path: Path): Boolean
6581

66-
/** Create a file. */
67-
def create(path: Path, overwrite: Boolean): FSDataOutputStream
68-
69-
/** Create a file and make its contents available atomically after the output stream is closed. */
70-
def createAtomic(path: Path, overwrite: Boolean): CancellableFSDataOutputStream
71-
72-
/** Open a file for reading, or throw exception if it does not exist. */
73-
def open(path: Path): FSDataInputStream
74-
75-
/** Rename a file. */
76-
def rename(srcPath: Path, dstPath: Path, overwrite: Boolean): Unit
77-
7882
/** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */
7983
def delete(path: Path): Unit
8084

81-
/** Copy a local file to a remote file. */
82-
def copyFromLocalFile(localSrcFile: File, destPath: Path): Unit
83-
84-
/** Copy a remote file to the local file. */
85-
def copyToLocalFile(srcPath: Path, localDestFile: File): Unit
86-
8785
/** Is the default file system this implementation is operating on the local file system. */
8886
def isLocal: Boolean
8987
}
9088

9189
object CheckpointFileManager extends Logging {
9290

91+
/**
92+
* Additional methods in CheckpointFileManager implementations that allows
93+
* [[RenameBasedFSDataOutputStream]] get atomicity by write-to-temp-file-and-rename
94+
*/
95+
sealed trait RenameHelperMethods { self => CheckpointFileManager
96+
/** Create a file with overwrite. */
97+
def create(path: Path): FSDataOutputStream
98+
99+
/**
100+
* Rename a file.
101+
*
102+
* @param srcPath Source path to rename
103+
* @param dstPath Destination path to rename to
104+
* @param overwriteIfPossible If true, then the implementations must do a best-effort attempt to
105+
* overwrite the file if it already exists. It should not throw
106+
* any exception if the file exists. However, if false, then the
107+
* implementation must not overwrite if the file alraedy exists and
108+
* must throw `FileAlreadyExistsException` in that case.
109+
*/
110+
def rename(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit
111+
}
112+
93113
/**
94114
* An interface to add the cancel() operation to [[FSDataOutputStream]]. This is used
95115
* mainly by `CheckpointFileManager.createAtomic` to write a file atomically.
@@ -107,13 +127,13 @@ object CheckpointFileManager extends Logging {
107127
* to a temporary file and then renames.
108128
*/
109129
sealed class RenameBasedFSDataOutputStream(
110-
fm: CheckpointFileManager,
130+
fm: CheckpointFileManager with RenameHelperMethods,
111131
finalPath: Path,
112132
tempPath: Path,
113-
overwrite: Boolean)
114-
extends CancellableFSDataOutputStream(fm.create(tempPath, overwrite)) {
133+
overwriteIfPossible: Boolean)
134+
extends CancellableFSDataOutputStream(fm.create(tempPath)) {
115135

116-
def this(fm: CheckpointFileManager, path: Path, overwrite: Boolean) = {
136+
def this(fm: CheckpointFileManager with RenameHelperMethods, path: Path, overwrite: Boolean) = {
117137
this(fm, path, generateTempPath(path), overwrite)
118138
}
119139

@@ -124,7 +144,7 @@ object CheckpointFileManager extends Logging {
124144
try {
125145
if (terminated) return
126146
super.close()
127-
fm.rename(tempPath, finalPath, overwrite)
147+
fm.rename(tempPath, finalPath, overwriteIfPossible)
128148
logInfo(s"Renamed temp file $tempPath to $finalPath")
129149
} finally {
130150
terminated = true
@@ -164,12 +184,12 @@ object CheckpointFileManager extends Logging {
164184
} catch {
165185
case e: UnsupportedFileSystemException =>
166186
logWarning(
167-
"Could not use FileContext API for managing metadata log files at path " +
168-
s"$path. Using FileSystem API instead for managing log files. The log may be " +
169-
s"inconsistent under failures.")
187+
"Could not use FileContext API for managing Structured Streaming checkpoint files at " +
188+
s"$path. Using FileSystem API instead for managing log files. If the implementation " +
189+
s"of FileSystem.rename() is not atomic, then the correctness and fault-tolerance of" +
190+
s"your Structured Streaming is not guaranteed.")
170191
new FileSystemBasedCheckpointFileManager(path, hadoopConf)
171192
}
172-
new FileSystemBasedCheckpointFileManager(path, hadoopConf)
173193
}
174194

175195
private def generateTempPath(path: Path): Path = {
@@ -182,7 +202,7 @@ object CheckpointFileManager extends Logging {
182202

183203
/** An implementation of [[CheckpointFileManager]] using Hadoop's [[FileSystem]] API. */
184204
class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)
185-
extends CheckpointFileManager with Logging {
205+
extends CheckpointFileManager with RenameHelperMethods with Logging {
186206

187207
import CheckpointFileManager._
188208

@@ -199,12 +219,13 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
199219
fs.mkdirs(path, FsPermission.getDirDefault)
200220
}
201221

202-
override def create(path: Path, overwrite: Boolean): FSDataOutputStream = {
203-
fs.create(path, overwrite)
222+
override def create(path: Path): FSDataOutputStream = {
223+
fs.create(path, true)
204224
}
205225

206-
override def createAtomic(path: Path, overwrite: Boolean): CancellableFSDataOutputStream = {
207-
new RenameBasedFSDataOutputStream(this, path, overwrite)
226+
override def createAtomic(
227+
path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
228+
new RenameBasedFSDataOutputStream(this, path, overwriteIfPossible)
208229
}
209230

210231
override def open(path: Path): FSDataInputStream = {
@@ -215,47 +236,33 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
215236
fs.exists(path)
216237
}
217238

218-
override def rename(srcPath: Path, dstPath: Path, overwrite: Boolean): Unit = {
219-
if (!overwrite && fs.exists(dstPath)) {
239+
override def rename(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = {
240+
if (!overwriteIfPossible && fs.exists(dstPath)) {
220241
throw new FileAlreadyExistsException(
221242
s"Failed to rename $srcPath to $dstPath as destination already exists")
222243
}
223244

224-
def deleteAndRename(prevException: Exception): Unit = {
225-
if (overwrite) {
226-
try {
227-
if (fs.delete(dstPath, true)) {
228-
logWarning(s"Failed to delete $dstPath before second attempt to rename")
229-
}
230-
if (!fs.rename(srcPath, dstPath)) {
231-
val msg = s"Failed to rename temp file $srcPath to $dstPath as second attempt to" +
232-
s"rename (after delete) returned false"
233-
logWarning(msg)
234-
val e = new IOException(msg)
235-
e.addSuppressed(prevException)
236-
throw e
237-
}
238-
} catch {
239-
case NonFatal(e) =>
240-
logError(s"Failed to write atomically to $dstPath", e)
241-
if (prevException != null) e.addSuppressed(prevException)
242-
throw e
243-
}
244-
} else {
245-
throw prevException
246-
}
247-
}
248-
249245
try {
250246
if (!fs.rename(srcPath, dstPath)) {
251-
val msg = s"Failed to rename temp file $srcPath to $dstPath as rename returned false"
252-
logWarning(msg)
253-
deleteAndRename(new IOException(msg))
247+
if (fs.exists(dstPath) && !overwriteIfPossible) {
248+
// Some implementations of FileSystem may not throw FileAlreadyExistsException but
249+
// only return false if file already exists. Explicitly throw the error.
250+
// Note that this is definitely not atomic, so this is only a best-effort attempt
251+
// to throw the most appropriate exception when rename returned false.
252+
throw new FileAlreadyExistsException(s"$dstPath already exists")
253+
} else {
254+
val msg = s"Failed to rename temp file $srcPath to $dstPath as rename returned false"
255+
logWarning(msg)
256+
throw new IOException(msg)
257+
}
254258
}
255259
} catch {
256260
case fe: FileAlreadyExistsException =>
261+
// Some implementation of FileSystem can directly throw FileAlreadyExistsException if file
262+
// already exists. Ignore the error if overwriteIfPossible = true as it is expected to be
263+
// best effort.
257264
logWarning(s"Failed to rename temp file $srcPath to $dstPath because file exists", fe)
258-
deleteAndRename(fe)
265+
if (!overwriteIfPossible) throw fe
259266
}
260267
}
261268

@@ -269,14 +276,6 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
269276
}
270277
}
271278

272-
override def copyFromLocalFile(localSrcFile: File, destPath: Path): Unit = {
273-
fs.copyFromLocalFile(new Path(localSrcFile.getAbsoluteFile.toURI), destPath)
274-
}
275-
276-
override def copyToLocalFile(srcPath: Path, localDestFile: File): Unit = {
277-
fs.copyToLocalFile(srcPath, new Path(localDestFile.getAbsoluteFile.toURI))
278-
}
279-
280279
override def isLocal: Boolean = fs match {
281280
case _: LocalFileSystem | _: RawLocalFileSystem => true
282281
case _ => false
@@ -286,7 +285,7 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
286285

287286
/** An implementation of [[CheckpointFileManager]] using Hadoop's [[FileContext]] API. */
288287
class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)
289-
extends CheckpointFileManager with Logging {
288+
extends CheckpointFileManager with RenameHelperMethods with Logging {
290289

291290
import CheckpointFileManager._
292291

@@ -304,14 +303,14 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio
304303
fc.mkdir(path, FsPermission.getDirDefault, true)
305304
}
306305

307-
override def create(path: Path, overwrite: Boolean): FSDataOutputStream = {
306+
override def create(path: Path): FSDataOutputStream = {
308307
import CreateFlag._
309-
val flags = if (overwrite) EnumSet.of(CREATE, OVERWRITE) else EnumSet.of(CREATE)
310-
fc.create(path, flags)
308+
fc.create(path, EnumSet.of(CREATE, OVERWRITE))
311309
}
312310

313-
override def createAtomic(path: Path, overwrite: Boolean): CancellableFSDataOutputStream = {
314-
new RenameBasedFSDataOutputStream(this, path, overwrite)
311+
override def createAtomic(
312+
path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
313+
new RenameBasedFSDataOutputStream(this, path, overwriteIfPossible)
315314
}
316315

317316
override def open(path: Path): FSDataInputStream = {
@@ -322,9 +321,9 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio
322321
fc.util.exists(path)
323322
}
324323

325-
override def rename(srcPath: Path, dstPath: Path, overwrite: Boolean): Unit = {
324+
override def rename(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = {
326325
import Options.Rename._
327-
fc.rename(srcPath, dstPath, if (overwrite) OVERWRITE else NONE)
326+
fc.rename(srcPath, dstPath, if (overwriteIfPossible) OVERWRITE else NONE)
328327
}
329328

330329

@@ -337,35 +336,6 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio
337336
}
338337
}
339338

340-
override def copyFromLocalFile(localSrcFile: File, destPath: Path): Unit = {
341-
val localFc = FileContext.getLocalFSFileContext
342-
var in: InputStream = null
343-
var out: OutputStream = null
344-
try {
345-
in = localFc.open(new Path(localSrcFile.getAbsoluteFile.toURI))
346-
out = fc.create(destPath, EnumSet.of(CreateFlag.CREATE))
347-
IOUtils.copyLarge(in, out)
348-
} finally {
349-
if (in != null) in.close()
350-
if (out != null) out.close()
351-
}
352-
}
353-
354-
override def copyToLocalFile(srcPath: Path, localDstFile: File): Unit = {
355-
val localFc = FileContext.getLocalFSFileContext
356-
var in: InputStream = null
357-
var out: OutputStream = null
358-
try {
359-
in = fc.open(srcPath)
360-
out = localFc.create(
361-
new Path(localDstFile.getAbsoluteFile.toURI), EnumSet.of(CreateFlag.CREATE))
362-
IOUtils.copyLarge(in, out)
363-
} finally {
364-
if (in != null) in.close()
365-
if (out != null) out.close()
366-
}
367-
}
368-
369339
override def isLocal: Boolean = fc.getDefaultFileSystem match {
370340
case _: LocalFs | _: RawLocalFs => true // LocalFs = RawLocalFs + ChecksumFs
371341
case _ => false

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
120120
* valid behavior, we still need to prevent it from destroying the files.
121121
*/
122122
private def writeBatchToFile(metadata: T, path: Path): Unit = {
123-
val output = fileManager.createAtomic(path, overwrite = false)
123+
val output = fileManager.createAtomic(path, overwriteIfPossible = false)
124124
try {
125125
serialize(metadata, output)
126126
output.close()

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
9292
private val newVersion = version + 1
9393
@volatile private var state: STATE = UPDATING
9494
private val finalDeltaFile: Path = deltaFile(newVersion)
95-
private lazy val deltaFileStream = fm.createAtomic(finalDeltaFile, overwrite = true)
95+
private lazy val deltaFileStream = fm.createAtomic(finalDeltaFile, overwriteIfPossible = true)
9696
private lazy val compressedStream = compressStream(deltaFileStream)
9797

9898
override def id: StateStoreId = HDFSBackedStateStoreProvider.this.stateStoreId
@@ -387,7 +387,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
387387
var rawOutput: CancellableFSDataOutputStream = null
388388
var output: DataOutputStream = null
389389
try {
390-
rawOutput = fm.createAtomic(targetFile, overwrite = true)
390+
rawOutput = fm.createAtomic(targetFile, overwriteIfPossible = true)
391391
output = compressStream(rawOutput)
392392
val iter = map.entrySet().iterator()
393393
while(iter.hasNext) {

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite {
3333

3434
def createManager(path: Path): CheckpointFileManager
3535

36-
test("mkdirs, list, create, rename, delete") {
36+
test("mkdirs, list, createAtomic, open, delete") {
3737
withTempPath { p =>
3838
val basePath = new Path(p.getAbsolutePath)
3939
val fm = createManager(basePath)
@@ -54,15 +54,17 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite {
5454
assert(fm.list(basePath, acceptAllFilter).exists(_.getPath.getName == "dir"))
5555
assert(fm.list(basePath, rejectAllFilter).length === 0)
5656

57-
// Create
57+
// Create atomic and exists
5858
val path = new Path(s"$dir/file")
5959
assert(!fm.exists(path))
60-
fm.create(path, overwrite = false).close()
60+
fm.createAtomic(path, overwriteIfPossible = false).cancel()
61+
assert(!fm.exists(path))
62+
fm.createAtomic(path, overwriteIfPossible = false).close()
6163
assert(fm.exists(path))
6264
intercept[IOException] {
63-
fm.create(path, overwrite = false)
65+
fm.createAtomic(path, overwriteIfPossible = false).close()
6466
}
65-
fm.create(path, overwrite = true).close()
67+
fm.createAtomic(path, overwriteIfPossible = true).close() // should not throw exception
6668

6769
// Open and delete
6870
fm.open(path).close()
@@ -72,21 +74,6 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite {
7274
fm.open(path)
7375
}
7476
fm.delete(path) // should not throw exception
75-
76-
// Rename
77-
val path1 = new Path(s"$dir/file1")
78-
val path2 = new Path(s"$dir/file2")
79-
fm.create(path1, overwrite = true).close()
80-
assert(fm.exists(path1))
81-
fm.rename(path1, path2, overwrite = false)
82-
83-
val path3 = new Path(s"$dir/file3")
84-
fm.create(path3, overwrite = true).close()
85-
assert(fm.exists(path3))
86-
intercept[FileAlreadyExistsException] {
87-
fm.rename(path2, path3, overwrite = false)
88-
}
89-
fm.rename(path2, path3, overwrite = true)
9077
}
9178
}
9279

0 commit comments

Comments
 (0)