Skip to content

Commit 1b23492

Browse files
committed
Fixed more tests
1 parent f9965f1 commit 1b23492

File tree

3 files changed

+57
-51
lines changed

3 files changed

+57
-51
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala

Lines changed: 33 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ object CheckpointFileManager extends Logging {
9494
*/
9595
sealed trait RenameHelperMethods { self => CheckpointFileManager
9696
/** Create a file with overwrite. */
97-
def create(path: Path): FSDataOutputStream
97+
def createTempFile(path: Path): FSDataOutputStream
9898

9999
/**
100100
* Rename a file.
@@ -107,7 +107,7 @@ object CheckpointFileManager extends Logging {
107107
* implementation must not overwrite if the file alraedy exists and
108108
* must throw `FileAlreadyExistsException` in that case.
109109
*/
110-
def rename(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit
110+
def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit
111111
}
112112

113113
/**
@@ -131,7 +131,7 @@ object CheckpointFileManager extends Logging {
131131
finalPath: Path,
132132
tempPath: Path,
133133
overwriteIfPossible: Boolean)
134-
extends CancellableFSDataOutputStream(fm.create(tempPath)) {
134+
extends CancellableFSDataOutputStream(fm.createTempFile(tempPath)) {
135135

136136
def this(fm: CheckpointFileManager with RenameHelperMethods, path: Path, overwrite: Boolean) = {
137137
this(fm, path, generateTempPath(path), overwrite)
@@ -143,8 +143,15 @@ object CheckpointFileManager extends Logging {
143143
override def close(): Unit = synchronized {
144144
try {
145145
if (terminated) return
146-
super.close()
147-
fm.rename(tempPath, finalPath, overwriteIfPossible)
146+
underlyingStream.close()
147+
try {
148+
fm.renameTempFile(tempPath, finalPath, overwriteIfPossible)
149+
} catch {
150+
case fe: FileAlreadyExistsException =>
151+
logWarning(
152+
s"Failed to rename temp file $tempPath to $finalPath because file exists", fe)
153+
if (!overwriteIfPossible) throw fe
154+
}
148155
logInfo(s"Renamed temp file $tempPath to $finalPath")
149156
} finally {
150157
terminated = true
@@ -208,9 +215,6 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
208215

209216
protected val fs = path.getFileSystem(hadoopConf)
210217

211-
fs.setVerifyChecksum(false)
212-
fs.setWriteChecksum(false)
213-
214218
override def list(path: Path, filter: PathFilter): Array[FileStatus] = {
215219
fs.listStatus(path, filter)
216220
}
@@ -219,7 +223,7 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
219223
fs.mkdirs(path, FsPermission.getDirDefault)
220224
}
221225

222-
override def create(path: Path): FSDataOutputStream = {
226+
override def createTempFile(path: Path): FSDataOutputStream = {
223227
fs.create(path, true)
224228
}
225229

@@ -236,33 +240,29 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
236240
fs.exists(path)
237241
}
238242

239-
override def rename(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = {
243+
override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = {
240244
if (!overwriteIfPossible && fs.exists(dstPath)) {
241245
throw new FileAlreadyExistsException(
242246
s"Failed to rename $srcPath to $dstPath as destination already exists")
243247
}
244248

245-
try {
246-
if (!fs.rename(srcPath, dstPath) && !overwriteIfPossible) {
247-
if (fs.exists(dstPath)) {
248-
// Some implementations of FileSystem may not throw FileAlreadyExistsException but
249-
// only return false if file already exists. Explicitly throw the error.
250-
// Note that this is definitely not atomic, so this is only a best-effort attempt
251-
// to throw the most appropriate exception when rename returned false.
249+
if (!fs.rename(srcPath, dstPath)) {
250+
// If overwriteIfPossible = false, then we want to find out why the rename failed and
251+
// try to throw the right error.
252+
if (fs.exists(dstPath)) {
253+
// Some implementations of FileSystem may only return false instead of throwing
254+
// FileAlreadyExistsException. In that case, explicitly throw the error the error
255+
// if overwriteIfPossible = false. Note that this is definitely not atomic.
256+
// This is only a best-effort attempt to identify the situation when rename returned
257+
// false.
258+
if (!overwriteIfPossible) {
252259
throw new FileAlreadyExistsException(s"$dstPath already exists")
253-
} else {
254-
val msg = s"Failed to rename temp file $srcPath to $dstPath as rename returned false"
255-
logWarning(msg)
256-
throw new IOException(msg)
257260
}
261+
} else {
262+
val msg = s"Failed to rename temp file $srcPath to $dstPath as rename returned false"
263+
logWarning(msg)
264+
throw new IOException(msg)
258265
}
259-
} catch {
260-
case fe: FileAlreadyExistsException =>
261-
// Some implementation of FileSystem can directly throw FileAlreadyExistsException if file
262-
// already exists. Ignore the error if overwriteIfPossible = true as it is expected to be
263-
// best effort.
264-
logWarning(s"Failed to rename temp file $srcPath to $dstPath because file exists", fe)
265-
if (!overwriteIfPossible) throw fe
266266
}
267267
}
268268

@@ -303,9 +303,11 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio
303303
fc.mkdir(path, FsPermission.getDirDefault, true)
304304
}
305305

306-
override def create(path: Path): FSDataOutputStream = {
306+
override def createTempFile(path: Path): FSDataOutputStream = {
307307
import CreateFlag._
308-
fc.create(path, EnumSet.of(CREATE, OVERWRITE))
308+
import Options._
309+
fc.create(
310+
path, EnumSet.of(CREATE, OVERWRITE), CreateOpts.checksumParam(ChecksumOpt.createDisabled()))
309311
}
310312

311313
override def createAtomic(
@@ -321,7 +323,7 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio
321323
fc.util.exists(path)
322324
}
323325

324-
override def rename(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = {
326+
override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = {
325327
import Options.Rename._
326328
fc.rename(srcPath, dstPath, if (overwriteIfPossible) OVERWRITE else NONE)
327329
}

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration
2525
import org.apache.hadoop.fs._
2626

2727
import org.apache.spark.SparkFunSuite
28+
import org.apache.spark.sql.catalyst.util.quietly
2829
import org.apache.spark.sql.internal.SQLConf
2930
import org.apache.spark.sql.test.SharedSparkSession
3031
import org.apache.spark.util.Utils
@@ -61,9 +62,11 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite {
6162
assert(!fm.exists(path))
6263
fm.createAtomic(path, overwriteIfPossible = false).close()
6364
assert(fm.exists(path))
64-
intercept[IOException] {
65-
// should throw exception since file exists and overwrite is false
66-
fm.createAtomic(path, overwriteIfPossible = false).close()
65+
quietly {
66+
intercept[IOException] {
67+
// should throw exception since file exists and overwrite is false
68+
fm.createAtomic(path, overwriteIfPossible = false).close()
69+
}
6770
}
6871

6972
// Create atomic with overwrite if possible
@@ -107,22 +110,22 @@ class CheckpointFileManagerSuite extends SparkFunSuite with SharedSparkSession {
107110

108111
test("CheckpointFileManager.create() should fallback from FileContext to FileSystem") {
109112
import FakeFileSystem.scheme
110-
spark.conf.set(
111-
s"fs.$scheme.impl",
112-
classOf[FakeFileSystem].getName)
113-
withTempDir { temp =>
114-
val metadataLog = new HDFSMetadataLog[String](spark, s"$scheme://${temp.toURI.getPath}")
115-
assert(metadataLog.add(0, "batch0"))
116-
assert(metadataLog.getLatest() === Some(0 -> "batch0"))
117-
assert(metadataLog.get(0) === Some("batch0"))
118-
assert(metadataLog.get(None, Some(0)) === Array(0 -> "batch0"))
119-
120-
121-
val metadataLog2 = new HDFSMetadataLog[String](spark, s"$scheme://${temp.toURI.getPath}")
122-
assert(metadataLog2.get(0) === Some("batch0"))
123-
assert(metadataLog2.getLatest() === Some(0 -> "batch0"))
124-
assert(metadataLog2.get(None, Some(0)) === Array(0 -> "batch0"))
113+
spark.conf.set(s"fs.$scheme.impl", classOf[FakeFileSystem].getName)
114+
quietly {
115+
withTempDir { temp =>
116+
val metadataLog = new HDFSMetadataLog[String](spark, s"$scheme://${temp.toURI.getPath}")
117+
assert(metadataLog.add(0, "batch0"))
118+
assert(metadataLog.getLatest() === Some(0 -> "batch0"))
119+
assert(metadataLog.get(0) === Some("batch0"))
120+
assert(metadataLog.get(None, Some(0)) === Array(0 -> "batch0"))
121+
125122

123+
val metadataLog2 = new HDFSMetadataLog[String](spark, s"$scheme://${temp.toURI.getPath}")
124+
assert(metadataLog2.get(0) === Some("batch0"))
125+
assert(metadataLog2.getLatest() === Some(0 -> "batch0"))
126+
assert(metadataLog2.get(None, Some(0)) === Array(0 -> "batch0"))
127+
128+
}
126129
}
127130
}
128131
}

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
8585

8686
// There should be exactly one file, called "2", in the metadata directory.
8787
// This check also tests for regressions of SPARK-17475
88-
val allFiles = new File(metadataLog.metadataPath.toString).listFiles().toSeq
88+
val allFiles = new File(metadataLog.metadataPath.toString).listFiles()
89+
.filter(!_.getName.startsWith(".")).toSeq
8990
assert(allFiles.size == 1)
9091
assert(allFiles(0).getName() == "2")
9192
}
@@ -136,7 +137,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
136137
}
137138
}
138139

139-
test("HDFSMetadataLog: metadata directory collision") {
140+
testQuietly("HDFSMetadataLog: metadata directory collision") {
140141
withTempDir { temp =>
141142
val waiter = new Waiter
142143
val maxBatchId = 100

0 commit comments

Comments
 (0)