Skip to content

Commit 12177b1

Browse files
committed
Addressed one more comment
1 parent 75b9b18 commit 12177b1

File tree

3 files changed

+16
-15
lines changed

3 files changed

+16
-15
lines changed

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,10 @@ class CheckpointFileManagerSuite extends SparkFunSuite with SharedSparkSession {
101101
test("CheckpointFileManager.create() should pick up user-specified class from conf") {
102102
withSQLConf(
103103
SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key ->
104-
classOf[TestCheckpointFileManager].getName) {
104+
classOf[CreateAtomicTestManager].getName) {
105105
val fileManager =
106106
CheckpointFileManager.create(new Path("/"), spark.sessionState.newHadoopConf)
107-
assert(fileManager.isInstanceOf[TestCheckpointFileManager])
107+
assert(fileManager.isInstanceOf[CreateAtomicTestManager])
108108
}
109109
}
110110

@@ -142,34 +142,34 @@ class FileSystemBasedCheckpointFileManagerSuite extends CheckpointFileManagerTes
142142

143143

144144
/** A fake implementation to test different characteristics of CheckpointFileManager interface */
145-
class TestCheckpointFileManager(path: Path, hadoopConf: Configuration)
145+
class CreateAtomicTestManager(path: Path, hadoopConf: Configuration)
146146
extends FileSystemBasedCheckpointFileManager(path, hadoopConf) {
147147

148148
import CheckpointFileManager._
149149

150150
override def createAtomic(path: Path, overwrite: Boolean): CancellableFSDataOutputStream = {
151-
if (TestCheckpointFileManager.shouldFailInCreateAtomic) {
152-
TestCheckpointFileManager.cancelCalledInCreateAtomic = false
151+
if (CreateAtomicTestManager.shouldFailInCreateAtomic) {
152+
CreateAtomicTestManager.cancelCalledInCreateAtomic = false
153153
}
154154
val originalOut = super.createAtomic(path, overwrite)
155155

156156
new CancellableFSDataOutputStream(originalOut) {
157157
override def close(): Unit = {
158-
if (TestCheckpointFileManager.shouldFailInCreateAtomic) {
158+
if (CreateAtomicTestManager.shouldFailInCreateAtomic) {
159159
throw new IOException("Copy failed intentionally")
160160
}
161161
super.close()
162162
}
163163

164164
override def cancel(): Unit = {
165-
TestCheckpointFileManager.cancelCalledInCreateAtomic = true
165+
CreateAtomicTestManager.cancelCalledInCreateAtomic = true
166166
originalOut.cancel()
167167
}
168168
}
169169
}
170170
}
171171

172-
object TestCheckpointFileManager {
172+
object CreateAtomicTestManager {
173173
@volatile var shouldFailInCreateAtomic = false
174174
@volatile var cancelCalledInCreateAtomic = false
175175
}

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import java.nio.charset.StandardCharsets._
2222

2323
import org.apache.spark.{SparkConf, SparkFunSuite}
2424
import org.apache.spark.sql.SparkSession
25-
import org.apache.spark.sql.execution.streaming.FakeFileSystem._
2625
import org.apache.spark.sql.test.SharedSQLContext
2726

2827
class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext {

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -475,34 +475,36 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
475475
val hadoopConf = new Configuration()
476476
hadoopConf.set(
477477
SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key,
478-
classOf[TestCheckpointFileManager].getName)
478+
classOf[CreateAtomicTestManager].getName)
479479
val remoteDir = Utils.createTempDir().getAbsolutePath
480480

481481
val provider = newStoreProvider(
482482
opId = Random.nextInt, partition = 0, dir = remoteDir, hadoopConf = hadoopConf)
483483

484484
// Disable failure of output stream and generate versions
485-
TestCheckpointFileManager.shouldFailInCreateAtomic = false
485+
CreateAtomicTestManager.shouldFailInCreateAtomic = false
486486
for (version <- 1 to 10) {
487487
val store = provider.getStore(version - 1)
488488
put(store, version.toString, version) // update "1" -> 1, "2" -> 2, ...
489489
store.commit()
490490
}
491491
val version10Data = (1L to 10).map(_.toString).map(x => x -> x).toSet
492492

493+
CreateAtomicTestManager.cancelCalledInCreateAtomic = false
493494
val store = provider.getStore(10)
494495
// Fail commit for next version and verify that reloading resets the files
495-
TestCheckpointFileManager.shouldFailInCreateAtomic = true
496+
CreateAtomicTestManager.shouldFailInCreateAtomic = true
496497
put(store, "11", 11)
497498
val e = intercept[IllegalStateException] { quietly { store.commit() } }
498-
assert(e.getCause.isInstanceOf[IOException], "Was waiting the IOException to be thrown")
499-
TestCheckpointFileManager.shouldFailInCreateAtomic = false
499+
assert(e.getCause.isInstanceOf[IOException])
500+
CreateAtomicTestManager.shouldFailInCreateAtomic = false
500501

501502
// Abort commit for next version and verify that reloading resets the files
503+
CreateAtomicTestManager.cancelCalledInCreateAtomic = false
502504
val store2 = provider.getStore(10)
503505
put(store2, "11", 11)
504506
store2.abort()
505-
assert(TestCheckpointFileManager.cancelCalledInCreateAtomic)
507+
assert(CreateAtomicTestManager.cancelCalledInCreateAtomic)
506508
}
507509

508510
override def newStoreProvider(): HDFSBackedStateStoreProvider = {

0 commit comments

Comments
 (0)