Skip to content

Commit 723faad

Browse files
avkghdongjoon-hyun
authored andcommitted
[SPARK-28912][STREAMING] Fixed MatchError in getCheckpointFiles()
### What changes were proposed in this pull request? This change fixes issue SPARK-28912. ### Why are the changes needed? If checkpoint directory is set to name which matches regex pattern used for checkpoint files then logs are flooded with MatchError exceptions and old checkpoint files are not removed. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Manually. 1. Start Hadoop in a pseudo-distributed mode. 2. In another terminal run command nc -lk 9999 3. In the Spark shell execute the following statements: ```scala val ssc = new StreamingContext(sc, Seconds(30)) ssc.checkpoint("hdfs://localhost:9000/checkpoint-01") val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() ``` Closes #25654 from avkgh/SPARK-28912. Authored-by: avk <nullp7r@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
1 parent 6fb5ef1 commit 723faad

File tree

2 files changed

+19
-2
lines changed

2 files changed

+19
-2
lines changed

streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,8 @@ object Checkpoint extends Logging {
131131
try {
132132
val statuses = fs.listStatus(path)
133133
if (statuses != null) {
134-
val paths = statuses.map(_.getPath)
135-
val filtered = paths.filter(p => REGEX.findFirstIn(p.toString).nonEmpty)
134+
val paths = statuses.filterNot(_.isDirectory).map(_.getPath)
135+
val filtered = paths.filter(p => REGEX.findFirstIn(p.getName).nonEmpty)
136136
filtered.sortWith(sortFunc)
137137
} else {
138138
logWarning(s"Listing $path returned null")

streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -847,6 +847,23 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
847847
checkpointWriter.stop()
848848
}
849849

850+
test("SPARK-28912: Fix MatchError in getCheckpointFiles") {
851+
withTempDir { tempDir =>
852+
val fs = FileSystem.get(tempDir.toURI, new Configuration)
853+
val checkpointDir = tempDir.getAbsolutePath + "/checkpoint-01"
854+
855+
assert(Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).length === 0)
856+
857+
// Ignore files whose parent path match.
858+
fs.create(new Path(checkpointDir, "this-is-matched-before-due-to-parent-path")).close()
859+
assert(Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).length === 0)
860+
861+
// Ignore directories whose names match.
862+
fs.mkdirs(new Path(checkpointDir, "checkpoint-1000000000"))
863+
assert(Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).length === 0)
864+
}
865+
}
866+
850867
test("SPARK-6847: stack overflow when updateStateByKey is followed by a checkpointed dstream") {
851868
// In this test, there are two updateStateByKey operators. The RDD DAG is as follows:
852869
//

0 commit comments

Comments
 (0)