Skip to content

Commit e24f21b

Browse files
gf53520zsxwing
authored andcommitted
[SPARK-19779][SS] Delete needless tmp file after restart structured streaming job
## What changes were proposed in this pull request? [SPARK-19779](https://issues.apache.org/jira/browse/SPARK-19779) The PR (#17012) can to fix restart a Structured Streaming application using hdfs as fileSystem, but also exist a problem that a tmp file of delta file is still reserved in hdfs. And Structured Streaming don't delete the tmp file generated when restart streaming job in future. ## How was this patch tested? unit tests Author: guifeng <guifengleaf@gmail.com> Closes #17124 from gf53520/SPARK-19779.
1 parent f37bb14 commit e24f21b

File tree

2 files changed

+10
-1
lines changed

2 files changed

+10
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,9 @@ private[state] class HDFSBackedStateStoreProvider(
283283
// semantically correct because Structured Streaming requires rerunning a batch should
284284
// generate the same output. (SPARK-19677)
285285
// scalastyle:on
286-
if (!fs.exists(finalDeltaFile) && !fs.rename(tempDeltaFile, finalDeltaFile)) {
286+
if (fs.exists(finalDeltaFile)) {
287+
fs.delete(tempDeltaFile, true)
288+
} else if (!fs.rename(tempDeltaFile, finalDeltaFile)) {
287289
throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile")
288290
}
289291
loadedMaps.put(newVersion, map)

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@ package org.apache.spark.sql.execution.streaming.state
2020
import java.io.{File, IOException}
2121
import java.net.URI
2222

23+
import scala.collection.JavaConverters._
2324
import scala.collection.mutable
2425
import scala.util.Random
2526

27+
import org.apache.commons.io.FileUtils
2628
import org.apache.hadoop.conf.Configuration
2729
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
2830
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
@@ -293,6 +295,11 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
293295
val provider = newStoreProvider(hadoopConf = conf)
294296
provider.getStore(0).commit()
295297
provider.getStore(0).commit()
298+
299+
// Verify we don't leak temp files
300+
val tempFiles = FileUtils.listFiles(new File(provider.id.checkpointLocation),
301+
null, true).asScala.filter(_.getName.startsWith("temp-"))
302+
assert(tempFiles.isEmpty)
296303
}
297304

298305
test("corrupted file handling") {

0 commit comments

Comments
 (0)