Skip to content

Commit 327ac83

Browse files
heary-caoHyukjinKwon
authored andcommitted
[SPARK-26180][CORE][TEST] Reuse withTempDir function to the SparkCore test case
## What changes were proposed in this pull request? Currently, the common `withTempDir` function is used in Spark SQL test cases. To handle `val dir = Utils. createTempDir()` and `Utils. deleteRecursively (dir)`. Unfortunately, the `withTempDir` function cannot be used in the Spark Core test case. This PR Sharing `withTempDir` function in Spark Sql and SparkCore to clean up SparkCore test cases. thanks. ## How was this patch tested? N / A Closes #23151 from heary-cao/withCreateTempDir. Authored-by: caoxuewen <cao.xuewen@zte.com.cn> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
1 parent 2f6e88f commit 327ac83

23 files changed

+858
-887
lines changed

core/src/test/scala/org/apache/spark/CheckpointSuite.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -586,8 +586,7 @@ object CheckpointSuite {
586586
class CheckpointCompressionSuite extends SparkFunSuite with LocalSparkContext {
587587

588588
test("checkpoint compression") {
589-
val checkpointDir = Utils.createTempDir()
590-
try {
589+
withTempDir { checkpointDir =>
591590
val conf = new SparkConf()
592591
.set("spark.checkpoint.compress", "true")
593592
.set("spark.ui.enabled", "false")
@@ -616,8 +615,6 @@ class CheckpointCompressionSuite extends SparkFunSuite with LocalSparkContext {
616615

617616
// Verify that the compressed content can be read back
618617
assert(rdd.collect().toSeq === (1 to 20))
619-
} finally {
620-
Utils.deleteRecursively(checkpointDir)
621618
}
622619
}
623620
}

core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala

Lines changed: 49 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -207,54 +207,55 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
207207
}
208208

209209
test("automatically cleanup normal checkpoint") {
210-
val checkpointDir = Utils.createTempDir()
211-
checkpointDir.delete()
212-
var rdd = newPairRDD()
213-
sc.setCheckpointDir(checkpointDir.toString)
214-
rdd.checkpoint()
215-
rdd.cache()
216-
rdd.collect()
217-
var rddId = rdd.id
218-
219-
// Confirm the checkpoint directory exists
220-
assert(ReliableRDDCheckpointData.checkpointPath(sc, rddId).isDefined)
221-
val path = ReliableRDDCheckpointData.checkpointPath(sc, rddId).get
222-
val fs = path.getFileSystem(sc.hadoopConfiguration)
223-
assert(fs.exists(path))
224-
225-
// the checkpoint is not cleaned by default (without the configuration set)
226-
var postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Seq(rddId))
227-
rdd = null // Make RDD out of scope, ok if collected earlier
228-
runGC()
229-
postGCTester.assertCleanup()
230-
assert(!fs.exists(ReliableRDDCheckpointData.checkpointPath(sc, rddId).get))
231-
232-
// Verify that checkpoints are NOT cleaned up if the config is not enabled
233-
sc.stop()
234-
val conf = new SparkConf()
235-
.setMaster("local[2]")
236-
.setAppName("cleanupCheckpoint")
237-
.set("spark.cleaner.referenceTracking.cleanCheckpoints", "false")
238-
sc = new SparkContext(conf)
239-
rdd = newPairRDD()
240-
sc.setCheckpointDir(checkpointDir.toString)
241-
rdd.checkpoint()
242-
rdd.cache()
243-
rdd.collect()
244-
rddId = rdd.id
245-
246-
// Confirm the checkpoint directory exists
247-
assert(fs.exists(ReliableRDDCheckpointData.checkpointPath(sc, rddId).get))
248-
249-
// Reference rdd to defeat any early collection by the JVM
250-
rdd.count()
251-
252-
// Test that GC causes checkpoint data cleanup after dereferencing the RDD
253-
postGCTester = new CleanerTester(sc, Seq(rddId))
254-
rdd = null // Make RDD out of scope
255-
runGC()
256-
postGCTester.assertCleanup()
257-
assert(fs.exists(ReliableRDDCheckpointData.checkpointPath(sc, rddId).get))
210+
withTempDir { checkpointDir =>
211+
checkpointDir.delete()
212+
var rdd = newPairRDD()
213+
sc.setCheckpointDir(checkpointDir.toString)
214+
rdd.checkpoint()
215+
rdd.cache()
216+
rdd.collect()
217+
var rddId = rdd.id
218+
219+
// Confirm the checkpoint directory exists
220+
assert(ReliableRDDCheckpointData.checkpointPath(sc, rddId).isDefined)
221+
val path = ReliableRDDCheckpointData.checkpointPath(sc, rddId).get
222+
val fs = path.getFileSystem(sc.hadoopConfiguration)
223+
assert(fs.exists(path))
224+
225+
// the checkpoint is not cleaned by default (without the configuration set)
226+
var postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Seq(rddId))
227+
rdd = null // Make RDD out of scope, ok if collected earlier
228+
runGC()
229+
postGCTester.assertCleanup()
230+
assert(!fs.exists(ReliableRDDCheckpointData.checkpointPath(sc, rddId).get))
231+
232+
// Verify that checkpoints are NOT cleaned up if the config is not enabled
233+
sc.stop()
234+
val conf = new SparkConf()
235+
.setMaster("local[2]")
236+
.setAppName("cleanupCheckpoint")
237+
.set("spark.cleaner.referenceTracking.cleanCheckpoints", "false")
238+
sc = new SparkContext(conf)
239+
rdd = newPairRDD()
240+
sc.setCheckpointDir(checkpointDir.toString)
241+
rdd.checkpoint()
242+
rdd.cache()
243+
rdd.collect()
244+
rddId = rdd.id
245+
246+
// Confirm the checkpoint directory exists
247+
assert(fs.exists(ReliableRDDCheckpointData.checkpointPath(sc, rddId).get))
248+
249+
// Reference rdd to defeat any early collection by the JVM
250+
rdd.count()
251+
252+
// Test that GC causes checkpoint data cleanup after dereferencing the RDD
253+
postGCTester = new CleanerTester(sc, Seq(rddId))
254+
rdd = null // Make RDD out of scope
255+
runGC()
256+
postGCTester.assertCleanup()
257+
assert(fs.exists(ReliableRDDCheckpointData.checkpointPath(sc, rddId).get))
258+
}
258259
}
259260

260261
test("automatically clean up local checkpoint") {

core/src/test/scala/org/apache/spark/FileSuite.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -306,17 +306,18 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
306306
.set("spark.files.openCostInBytes", "0")
307307
.set("spark.default.parallelism", "1"))
308308

309-
val tempDir = Utils.createTempDir()
310-
val tempDirPath = tempDir.getAbsolutePath
309+
withTempDir { tempDir =>
310+
val tempDirPath = tempDir.getAbsolutePath
311311

312-
for (i <- 0 until 8) {
313-
val tempFile = new File(tempDir, s"part-0000$i")
314-
Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", tempFile,
315-
StandardCharsets.UTF_8)
316-
}
312+
for (i <- 0 until 8) {
313+
val tempFile = new File(tempDir, s"part-0000$i")
314+
Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", tempFile,
315+
StandardCharsets.UTF_8)
316+
}
317317

318-
for (p <- Seq(1, 2, 8)) {
319-
assert(sc.binaryFiles(tempDirPath, minPartitions = p).getNumPartitions === p)
318+
for (p <- Seq(1, 2, 8)) {
319+
assert(sc.binaryFiles(tempDirPath, minPartitions = p).getNumPartitions === p)
320+
}
320321
}
321322
}
322323

0 commit comments

Comments
 (0)