Skip to content

Commit b377ef1

Browse files
Sergei Lebedevcloud-fan
Sergei Lebedev
authored andcommitted
[SPARK-22227][CORE] DiskBlockManager.getAllBlocks now tolerates temp files
## What changes were proposed in this pull request? Prior to this commit getAllBlocks implicitly assumed that the directories managed by the DiskBlockManager contain only the files corresponding to valid block IDs. In reality, this assumption was violated during shuffle, which produces temporary files in the same directory as the resulting blocks. As a result, calls to getAllBlocks during shuffle were unreliable. The fix could be made more efficient, but this is probably good enough. ## How was this patch tested? `DiskBlockManagerSuite` Author: Sergei Lebedev <s.lebedev@criteo.com> Closes apache#19458 from superbobry/block-id-option.
1 parent d212ef1 commit b377ef1

File tree

4 files changed

+33
-10
lines changed

4 files changed

+33
-10
lines changed

core/src/main/scala/org/apache/spark/storage/BlockId.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.storage
1919

2020
import java.util.UUID
2121

22+
import org.apache.spark.SparkException
2223
import org.apache.spark.annotation.DeveloperApi
2324

2425
/**
@@ -95,6 +96,10 @@ private[spark] case class TestBlockId(id: String) extends BlockId {
9596
override def name: String = "test_" + id
9697
}
9798

99+
@DeveloperApi
100+
class UnrecognizedBlockId(name: String)
101+
extends SparkException(s"Failed to parse $name into a block ID")
102+
98103
@DeveloperApi
99104
object BlockId {
100105
val RDD = "rdd_([0-9]+)_([0-9]+)".r
@@ -104,10 +109,11 @@ object BlockId {
104109
val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r
105110
val TASKRESULT = "taskresult_([0-9]+)".r
106111
val STREAM = "input-([0-9]+)-([0-9]+)".r
112+
val TEMP_LOCAL = "temp_local_([-A-Fa-f0-9]+)".r
113+
val TEMP_SHUFFLE = "temp_shuffle_([-A-Fa-f0-9]+)".r
107114
val TEST = "test_(.*)".r
108115

109-
/** Converts a BlockId "name" String back into a BlockId. */
110-
def apply(id: String): BlockId = id match {
116+
def apply(name: String): BlockId = name match {
111117
case RDD(rddId, splitIndex) =>
112118
RDDBlockId(rddId.toInt, splitIndex.toInt)
113119
case SHUFFLE(shuffleId, mapId, reduceId) =>
@@ -122,9 +128,13 @@ object BlockId {
122128
TaskResultBlockId(taskId.toLong)
123129
case STREAM(streamId, uniqueId) =>
124130
StreamBlockId(streamId.toInt, uniqueId.toLong)
131+
case TEMP_LOCAL(uuid) =>
132+
TempLocalBlockId(UUID.fromString(uuid))
133+
case TEMP_SHUFFLE(uuid) =>
134+
TempShuffleBlockId(UUID.fromString(uuid))
125135
case TEST(value) =>
126136
TestBlockId(value)
127137
case _ =>
128-
throw new IllegalStateException("Unrecognized BlockId: " + id)
138+
throw new UnrecognizedBlockId(name)
129139
}
130140
}

core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,16 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
100100

101101
/** List all the blocks currently stored on disk by the disk manager. */
102102
def getAllBlocks(): Seq[BlockId] = {
103-
getAllFiles().map(f => BlockId(f.getName))
103+
getAllFiles().flatMap { f =>
104+
try {
105+
Some(BlockId(f.getName))
106+
} catch {
107+
case _: UnrecognizedBlockId =>
108+
// Skip files which do not correspond to blocks, for example temporary
109+
// files created by [[SortShuffleWriter]].
110+
None
111+
}
112+
}
104113
}
105114

106115
/** Produces a unique block id and File suitable for storing local intermediate results. */

core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,8 @@ class BlockIdSuite extends SparkFunSuite {
3535
}
3636

3737
test("test-bad-deserialization") {
38-
try {
39-
// Try to deserialize an invalid block id.
38+
intercept[UnrecognizedBlockId] {
4039
BlockId("myblock")
41-
fail()
42-
} catch {
43-
case e: IllegalStateException => // OK
44-
case _: Throwable => fail()
4540
}
4641
}
4742

@@ -139,6 +134,7 @@ class BlockIdSuite extends SparkFunSuite {
139134
assert(id.id.getMostSignificantBits() === 5)
140135
assert(id.id.getLeastSignificantBits() === 2)
141136
assert(!id.isShuffle)
137+
assertSame(id, BlockId(id.toString))
142138
}
143139

144140
test("temp shuffle") {
@@ -151,6 +147,7 @@ class BlockIdSuite extends SparkFunSuite {
151147
assert(id.id.getMostSignificantBits() === 1)
152148
assert(id.id.getLeastSignificantBits() === 2)
153149
assert(!id.isShuffle)
150+
assertSame(id, BlockId(id.toString))
154151
}
155152

156153
test("test") {

core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.storage
1919

2020
import java.io.{File, FileWriter}
21+
import java.util.UUID
2122

2223
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
2324

@@ -79,6 +80,12 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
7980
assert(diskBlockManager.getAllBlocks.toSet === ids.toSet)
8081
}
8182

83+
test("SPARK-22227: non-block files are skipped") {
84+
val file = diskBlockManager.getFile("unmanaged_file")
85+
writeToFile(file, 10)
86+
assert(diskBlockManager.getAllBlocks().isEmpty)
87+
}
88+
8289
def writeToFile(file: File, numBytes: Int) {
8390
val writer = new FileWriter(file, true)
8491
for (i <- 0 until numBytes) writer.write(i)

0 commit comments

Comments
 (0)