Skip to content

Commit cbb41a0

Browse files
committed
[SPARK-23966][SS] Refactoring all checkpoint file writing logic in a common CheckpointFileManager interface
## What changes were proposed in this pull request? Checkpoint files (offset log files, state store files) in Structured Streaming must be written atomically such that no partial files are generated (would break fault-tolerance guarantees). Currently, there are 3 locations which try to do this individually, and in some cases, incorrectly. 1. HDFSOffsetMetadataLog - This uses a FileManager interface to use any implementation of `FileSystem` or `FileContext` APIs. It preferably loads `FileContext` implementation as FileContext of HDFS has atomic renames. 1. HDFSBackedStateStore (aka in-memory state store) - Writing a version.delta file - This uses FileSystem APIs only to perform a rename. This is incorrect as rename is not atomic in HDFS FileSystem implementation. - Writing a snapshot file - Same as above. #### Current problems: 1. State Store behavior is incorrect - HDFS FileSystem implementation does not have atomic rename. 1. Inflexible - Some file systems provide mechanisms other than write-to-temp-file-and-rename for writing atomically and more efficiently. For example, with S3 you can write directly to the final file and it will be made visible only when the entire file is written and closed correctly. Any failure can be made to terminate the writing without making any partial files visible in S3. The current code does not abstract out this mechanism enough that it can be customized. #### Solution: 1. Introduce a common interface that all 3 cases above can use to write checkpoint files atomically. 2. This interface must provide the necessary interfaces that allow customization of the write-and-rename mechanism. This PR does that by introducing the interface `CheckpointFileManager` and modifying `HDFSMetadataLog` and `HDFSBackedStateStore` to use the interface. Similar to earlier `FileManager`, there are implementations based on `FileSystem` and `FileContext` APIs, and the latter implementation is preferred to make it work correctly with HDFS. The key method this interface has is `createAtomic(path, overwrite)` which returns a `CancellableFSDataOutputStream` that has the method `cancel()`. All users of this method need to either call `close()` to successfully write the file, or `cancel()` in case of an error. ## How was this patch tested? New tests in `CheckpointFileManagerSuite` and slightly modified existing tests. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #21048 from tdas/SPARK-23966.
1 parent 558f31b commit cbb41a0

File tree

9 files changed

+678
-402
lines changed

9 files changed

+678
-402
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -930,6 +930,13 @@ object SQLConf {
930930
.intConf
931931
.createWithDefault(100)
932932

933+
val STREAMING_CHECKPOINT_FILE_MANAGER_CLASS =
934+
buildConf("spark.sql.streaming.checkpointFileManagerClass")
935+
.doc("The class used to write checkpoint files atomically. This class must be a subclass " +
936+
"of the interface CheckpointFileManager.")
937+
.internal()
938+
.stringConf
939+
933940
val NDV_MAX_ERROR =
934941
buildConf("spark.sql.statistics.ndv.maxError")
935942
.internal()
Lines changed: 349 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,349 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution.streaming
18+
19+
import java.io.{FileNotFoundException, IOException, OutputStream}
20+
import java.util.{EnumSet, UUID}
21+
22+
import scala.util.control.NonFatal
23+
24+
import org.apache.hadoop.conf.Configuration
25+
import org.apache.hadoop.fs._
26+
import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
27+
import org.apache.hadoop.fs.permission.FsPermission
28+
29+
import org.apache.spark.internal.Logging
30+
import org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
31+
import org.apache.spark.sql.internal.SQLConf
32+
import org.apache.spark.util.Utils
33+
34+
/**
35+
* An interface to abstract out all operation related to streaming checkpoints. Most importantly,
36+
* the key operation this interface provides is `createAtomic(path, overwrite)` which returns a
37+
* `CancellableFSDataOutputStream`. This method is used by [[HDFSMetadataLog]] and
38+
* [[org.apache.spark.sql.execution.streaming.state.StateStore StateStore]] implementations
39+
* to write a complete checkpoint file atomically (i.e. no partial file will be visible), with or
40+
* without overwrite.
41+
*
42+
* This higher-level interface above the Hadoop FileSystem is necessary because
43+
* different implementation of FileSystem/FileContext may have different combination of operations
44+
* to provide the desired atomic guarantees (e.g. write-to-temp-file-and-rename,
45+
* direct-write-and-cancel-on-failure) and this abstraction allow different implementations while
46+
* keeping the usage simple (`createAtomic` -> `close` or `cancel`).
47+
*/
48+
trait CheckpointFileManager {
49+
50+
import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
51+
52+
/**
53+
* Create a file and make its contents available atomically after the output stream is closed.
54+
*
55+
* @param path Path to create
56+
* @param overwriteIfPossible If true, then the implementations must do a best-effort attempt to
57+
* overwrite the file if it already exists. It should not throw
58+
* any exception if the file exists. However, if false, then the
59+
* implementation must not overwrite if the file alraedy exists and
60+
* must throw `FileAlreadyExistsException` in that case.
61+
*/
62+
def createAtomic(path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream
63+
64+
/** Open a file for reading, or throw exception if it does not exist. */
65+
def open(path: Path): FSDataInputStream
66+
67+
/** List the files in a path that match a filter. */
68+
def list(path: Path, filter: PathFilter): Array[FileStatus]
69+
70+
/** List all the files in a path. */
71+
def list(path: Path): Array[FileStatus] = {
72+
list(path, new PathFilter { override def accept(path: Path): Boolean = true })
73+
}
74+
75+
/** Make directory at the give path and all its parent directories as needed. */
76+
def mkdirs(path: Path): Unit
77+
78+
/** Whether path exists */
79+
def exists(path: Path): Boolean
80+
81+
/** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */
82+
def delete(path: Path): Unit
83+
84+
/** Is the default file system this implementation is operating on the local file system. */
85+
def isLocal: Boolean
86+
}
87+
88+
object CheckpointFileManager extends Logging {
89+
90+
/**
91+
* Additional methods in CheckpointFileManager implementations that allows
92+
* [[RenameBasedFSDataOutputStream]] get atomicity by write-to-temp-file-and-rename
93+
*/
94+
sealed trait RenameHelperMethods { self => CheckpointFileManager
95+
/** Create a file with overwrite. */
96+
def createTempFile(path: Path): FSDataOutputStream
97+
98+
/**
99+
* Rename a file.
100+
*
101+
* @param srcPath Source path to rename
102+
* @param dstPath Destination path to rename to
103+
* @param overwriteIfPossible If true, then the implementations must do a best-effort attempt to
104+
* overwrite the file if it already exists. It should not throw
105+
* any exception if the file exists. However, if false, then the
106+
* implementation must not overwrite if the file alraedy exists and
107+
* must throw `FileAlreadyExistsException` in that case.
108+
*/
109+
def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit
110+
}
111+
112+
/**
113+
* An interface to add the cancel() operation to [[FSDataOutputStream]]. This is used
114+
* mainly by `CheckpointFileManager.createAtomic` to write a file atomically.
115+
*
116+
* @see [[CheckpointFileManager]].
117+
*/
118+
abstract class CancellableFSDataOutputStream(protected val underlyingStream: OutputStream)
119+
extends FSDataOutputStream(underlyingStream, null) {
120+
/** Cancel the `underlyingStream` and ensure that the output file is not generated. */
121+
def cancel(): Unit
122+
}
123+
124+
/**
125+
* An implementation of [[CancellableFSDataOutputStream]] that writes a file atomically by writing
126+
* to a temporary file and then renames.
127+
*/
128+
sealed class RenameBasedFSDataOutputStream(
129+
fm: CheckpointFileManager with RenameHelperMethods,
130+
finalPath: Path,
131+
tempPath: Path,
132+
overwriteIfPossible: Boolean)
133+
extends CancellableFSDataOutputStream(fm.createTempFile(tempPath)) {
134+
135+
def this(fm: CheckpointFileManager with RenameHelperMethods, path: Path, overwrite: Boolean) = {
136+
this(fm, path, generateTempPath(path), overwrite)
137+
}
138+
139+
logInfo(s"Writing atomically to $finalPath using temp file $tempPath")
140+
@volatile private var terminated = false
141+
142+
override def close(): Unit = synchronized {
143+
try {
144+
if (terminated) return
145+
underlyingStream.close()
146+
try {
147+
fm.renameTempFile(tempPath, finalPath, overwriteIfPossible)
148+
} catch {
149+
case fe: FileAlreadyExistsException =>
150+
logWarning(
151+
s"Failed to rename temp file $tempPath to $finalPath because file exists", fe)
152+
if (!overwriteIfPossible) throw fe
153+
}
154+
logInfo(s"Renamed temp file $tempPath to $finalPath")
155+
} finally {
156+
terminated = true
157+
}
158+
}
159+
160+
override def cancel(): Unit = synchronized {
161+
try {
162+
if (terminated) return
163+
underlyingStream.close()
164+
fm.delete(tempPath)
165+
} catch {
166+
case NonFatal(e) =>
167+
logWarning(s"Error cancelling write to $finalPath", e)
168+
} finally {
169+
terminated = true
170+
}
171+
}
172+
}
173+
174+
175+
/** Create an instance of [[CheckpointFileManager]] based on the path and configuration. */
176+
def create(path: Path, hadoopConf: Configuration): CheckpointFileManager = {
177+
val fileManagerClass = hadoopConf.get(
178+
SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key)
179+
if (fileManagerClass != null) {
180+
return Utils.classForName(fileManagerClass)
181+
.getConstructor(classOf[Path], classOf[Configuration])
182+
.newInstance(path, hadoopConf)
183+
.asInstanceOf[CheckpointFileManager]
184+
}
185+
try {
186+
// Try to create a manager based on `FileContext` because HDFS's `FileContext.rename()
187+
// gives atomic renames, which is what we rely on for the default implementation
188+
// `CheckpointFileManager.createAtomic`.
189+
new FileContextBasedCheckpointFileManager(path, hadoopConf)
190+
} catch {
191+
case e: UnsupportedFileSystemException =>
192+
logWarning(
193+
"Could not use FileContext API for managing Structured Streaming checkpoint files at " +
194+
s"$path. Using FileSystem API instead for managing log files. If the implementation " +
195+
s"of FileSystem.rename() is not atomic, then the correctness and fault-tolerance of" +
196+
s"your Structured Streaming is not guaranteed.")
197+
new FileSystemBasedCheckpointFileManager(path, hadoopConf)
198+
}
199+
}
200+
201+
private def generateTempPath(path: Path): Path = {
202+
val tc = org.apache.spark.TaskContext.get
203+
val tid = if (tc != null) ".TID" + tc.taskAttemptId else ""
204+
new Path(path.getParent, s".${path.getName}.${UUID.randomUUID}${tid}.tmp")
205+
}
206+
}
207+
208+
209+
/** An implementation of [[CheckpointFileManager]] using Hadoop's [[FileSystem]] API. */
210+
class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)
211+
extends CheckpointFileManager with RenameHelperMethods with Logging {
212+
213+
import CheckpointFileManager._
214+
215+
protected val fs = path.getFileSystem(hadoopConf)
216+
217+
override def list(path: Path, filter: PathFilter): Array[FileStatus] = {
218+
fs.listStatus(path, filter)
219+
}
220+
221+
override def mkdirs(path: Path): Unit = {
222+
fs.mkdirs(path, FsPermission.getDirDefault)
223+
}
224+
225+
override def createTempFile(path: Path): FSDataOutputStream = {
226+
fs.create(path, true)
227+
}
228+
229+
override def createAtomic(
230+
path: Path,
231+
overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
232+
new RenameBasedFSDataOutputStream(this, path, overwriteIfPossible)
233+
}
234+
235+
override def open(path: Path): FSDataInputStream = {
236+
fs.open(path)
237+
}
238+
239+
override def exists(path: Path): Boolean = {
240+
try
241+
return fs.getFileStatus(path) != null
242+
catch {
243+
case e: FileNotFoundException =>
244+
return false
245+
}
246+
}
247+
248+
override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = {
249+
if (!overwriteIfPossible && fs.exists(dstPath)) {
250+
throw new FileAlreadyExistsException(
251+
s"Failed to rename $srcPath to $dstPath as destination already exists")
252+
}
253+
254+
if (!fs.rename(srcPath, dstPath)) {
255+
// FileSystem.rename() returning false is very ambiguous as it can be for many reasons.
256+
// This tries to make a best effort attempt to return the most appropriate exception.
257+
if (fs.exists(dstPath)) {
258+
if (!overwriteIfPossible) {
259+
throw new FileAlreadyExistsException(s"Failed to rename as $dstPath already exists")
260+
}
261+
} else if (!fs.exists(srcPath)) {
262+
throw new FileNotFoundException(s"Failed to rename as $srcPath was not found")
263+
} else {
264+
val msg = s"Failed to rename temp file $srcPath to $dstPath as rename returned false"
265+
logWarning(msg)
266+
throw new IOException(msg)
267+
}
268+
}
269+
}
270+
271+
override def delete(path: Path): Unit = {
272+
try {
273+
fs.delete(path, true)
274+
} catch {
275+
case e: FileNotFoundException =>
276+
logInfo(s"Failed to delete $path as it does not exist")
277+
// ignore if file has already been deleted
278+
}
279+
}
280+
281+
override def isLocal: Boolean = fs match {
282+
case _: LocalFileSystem | _: RawLocalFileSystem => true
283+
case _ => false
284+
}
285+
}
286+
287+
288+
/** An implementation of [[CheckpointFileManager]] using Hadoop's [[FileContext]] API. */
289+
class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)
290+
extends CheckpointFileManager with RenameHelperMethods with Logging {
291+
292+
import CheckpointFileManager._
293+
294+
private val fc = if (path.toUri.getScheme == null) {
295+
FileContext.getFileContext(hadoopConf)
296+
} else {
297+
FileContext.getFileContext(path.toUri, hadoopConf)
298+
}
299+
300+
override def list(path: Path, filter: PathFilter): Array[FileStatus] = {
301+
fc.util.listStatus(path, filter)
302+
}
303+
304+
override def mkdirs(path: Path): Unit = {
305+
fc.mkdir(path, FsPermission.getDirDefault, true)
306+
}
307+
308+
override def createTempFile(path: Path): FSDataOutputStream = {
309+
import CreateFlag._
310+
import Options._
311+
fc.create(
312+
path, EnumSet.of(CREATE, OVERWRITE), CreateOpts.checksumParam(ChecksumOpt.createDisabled()))
313+
}
314+
315+
override def createAtomic(
316+
path: Path,
317+
overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
318+
new RenameBasedFSDataOutputStream(this, path, overwriteIfPossible)
319+
}
320+
321+
override def open(path: Path): FSDataInputStream = {
322+
fc.open(path)
323+
}
324+
325+
override def exists(path: Path): Boolean = {
326+
fc.util.exists(path)
327+
}
328+
329+
override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = {
330+
import Options.Rename._
331+
fc.rename(srcPath, dstPath, if (overwriteIfPossible) OVERWRITE else NONE)
332+
}
333+
334+
335+
override def delete(path: Path): Unit = {
336+
try {
337+
fc.delete(path, true)
338+
} catch {
339+
case e: FileNotFoundException =>
340+
// ignore if file has already been deleted
341+
}
342+
}
343+
344+
override def isLocal: Boolean = fc.getDefaultFileSystem match {
345+
case _: LocalFs | _: RawLocalFs => true // LocalFs = RawLocalFs + ChecksumFs
346+
case _ => false
347+
}
348+
}
349+

0 commit comments

Comments
 (0)