Skip to content

Commit 41c9ece

Browse files
committed
Added more unit tests for BlockManager, DiskBlockManager, and ContextCleaner.
1 parent 6222697 commit 41c9ece

13 files changed

+355
-72
lines changed

core/src/main/scala/org/apache/spark/ContextCleaner.scala

+9-3
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
6363

6464
private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
6565

66+
/** Whether the cleaning thread will block on cleanup tasks */
67+
private val blockOnCleanupTasks = sc.conf.getBoolean("spark.cleaner.referenceTracking.blocking", false)
68+
6669
@volatile private var stopped = false
6770

6871
/** Attach a listener object to get information of when objects are cleaned. */
@@ -112,9 +115,12 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
112115
logDebug("Got cleaning task " + task)
113116
referenceBuffer -= reference.get
114117
task match {
115-
case CleanRDD(rddId) => doCleanupRDD(rddId, blocking = false)
116-
case CleanShuffle(shuffleId) => doCleanupShuffle(shuffleId, blocking = false)
117-
case CleanBroadcast(broadcastId) => doCleanupBroadcast(broadcastId, blocking = false)
118+
case CleanRDD(rddId) =>
119+
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
120+
case CleanShuffle(shuffleId) =>
121+
doCleanupShuffle(shuffleId, blocking = blockOnCleanupTasks)
122+
case CleanBroadcast(broadcastId) =>
123+
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
118124
}
119125
}
120126
} catch {

core/src/main/scala/org/apache/spark/SparkContext.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ class SparkContext(
229229
dagScheduler.start()
230230

231231
private[spark] val cleaner: Option[ContextCleaner] =
232-
if (conf.getBoolean("spark.cleaner.automatic", true)) {
232+
if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
233233
Some(new ContextCleaner(this))
234234
} else None
235235

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

+9
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,15 @@ private[spark] class BlockManager(
218218
}
219219
}
220220

221+
/**
222+
* Get the ids of existing blocks that match the given filter. Note that this will
223+
* query the blocks stored in the disk block manager (that the block manager
224+
* may not know of).
225+
*/
226+
def getMatchingBlockIds(filter: BlockId => Boolean): Seq[BlockId] = {
227+
(blockInfo.keys ++ diskBlockManager.getAllBlocks()).filter(filter).toSeq
228+
}
229+
221230
/**
222231
* Tell the master about the current storage status of a block. This will send a block update
223232
* message reflecting the current status, *not* the desired storage level in its block info.

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

+20-3
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
130130

131131
/** Remove all blocks belonging to the given broadcast. */
132132
def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) {
133-
val future = askDriverWithReply[Future[Seq[Int]]](RemoveBroadcast(broadcastId, removeFromMaster))
133+
val future = askDriverWithReply[Future[Seq[Int]]](
134+
RemoveBroadcast(broadcastId, removeFromMaster))
134135
future.onFailure {
135136
case e: Throwable =>
136137
logError("Failed to remove broadcast " + broadcastId +
@@ -156,8 +157,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
156157
}
157158

158159
/**
159-
* Return the block's status on all block managers, if any. This can potentially be an
160-
* expensive operation and is used mainly for testing.
160+
* Return the block's status on all block managers, if any. NOTE: This is a
161+
* potentially expensive operation and should only be used for testing.
161162
*
162163
* If askSlaves is true, this invokes the master to query each block manager for the most
163164
* updated block statuses. This is useful when the master is not informed of the given block
@@ -184,6 +185,22 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
184185
}.toMap
185186
}
186187

188+
/**
189+
* Return a list of ids of existing blocks such that the ids match the given filter. NOTE: This
190+
* is a potentially expensive operation and should only be used for testing.
191+
*
192+
* If askSlaves is true, this invokes the master to query each block manager for the most
193+
* updated block statuses. This is useful when the master is not informed of the given block
194+
* by all block managers.
195+
*/
196+
def getMatcinghBlockIds(
197+
filter: BlockId => Boolean,
198+
askSlaves: Boolean): Seq[BlockId] = {
199+
val msg = GetMatchingBlockIds(filter, askSlaves)
200+
val future = askDriverWithReply[Future[Seq[BlockId]]](msg)
201+
Await.result(future, timeout)
202+
}
203+
187204
/** Stop the driver actor, called only on the Spark driver node */
188205
def stop() {
189206
if (driverActor != null) {

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala

+31-2
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,9 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
9696
case GetBlockStatus(blockId, askSlaves) =>
9797
sender ! blockStatus(blockId, askSlaves)
9898

99+
case GetMatchingBlockIds(filter, askSlaves) =>
100+
sender ! getMatchingBlockIds(filter, askSlaves)
101+
99102
case RemoveRdd(rddId) =>
100103
sender ! removeRdd(rddId)
101104

@@ -266,8 +269,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
266269
}
267270

268271
/**
269-
* Return the block's status for all block managers, if any. This can potentially be an
270-
* expensive operation and is used mainly for testing.
272+
* Return the block's status for all block managers, if any. NOTE: This is a
273+
* potentially expensive operation and should only be used for testing.
271274
*
272275
* If askSlaves is true, the master queries each block manager for the most updated block
273276
* statuses. This is useful when the master is not informed of the given block by all block
@@ -294,6 +297,32 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
294297
}.toMap
295298
}
296299

300+
/**
301+
* Return the ids of blocks present in all the block managers that match the given filter.
302+
* NOTE: This is a potentially expensive operation and should only be used for testing.
303+
*
304+
* If askSlaves is true, the master queries each block manager for the most updated block
305+
* statuses. This is useful when the master is not informed of the given block by all block
306+
* managers.
307+
*/
308+
private def getMatchingBlockIds(
309+
filter: BlockId => Boolean,
310+
askSlaves: Boolean): Future[Seq[BlockId]] = {
311+
import context.dispatcher
312+
val getMatchingBlockIds = GetMatchingBlockIds(filter)
313+
Future.sequence(
314+
blockManagerInfo.values.map { info =>
315+
val future =
316+
if (askSlaves) {
317+
info.slaveActor.ask(getMatchingBlockIds)(akkaTimeout).mapTo[Seq[BlockId]]
318+
} else {
319+
Future { info.blocks.keys.filter(filter).toSeq }
320+
}
321+
future
322+
}
323+
).map(_.flatten.toSeq)
324+
}
325+
297326
private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
298327
if (!blockManagerInfo.contains(id)) {
299328
blockManagerIdByExecutor.get(id.executorId) match {

core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala

+3
Original file line numberDiff line numberDiff line change
@@ -115,5 +115,8 @@ private[storage] object BlockManagerMessages {
115115
case class GetBlockStatus(blockId: BlockId, askSlaves: Boolean = true)
116116
extends ToBlockManagerMaster
117117

118+
case class GetMatchingBlockIds(filter: BlockId => Boolean, askSlaves: Boolean = true)
119+
extends ToBlockManagerMaster
120+
118121
case object ExpireDeadHosts extends ToBlockManagerMaster
119122
}

core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala

+11-5
Original file line numberDiff line numberDiff line change
@@ -39,28 +39,34 @@ class BlockManagerSlaveActor(
3939
// Operations that involve removing blocks may be slow and should be done asynchronously
4040
override def receive = {
4141
case RemoveBlock(blockId) =>
42-
doAsync("removing block", sender) {
42+
doAsync[Boolean]("removing block", sender) {
4343
blockManager.removeBlock(blockId)
4444
true
4545
}
4646

4747
case RemoveRdd(rddId) =>
48-
doAsync("removing RDD", sender) {
48+
doAsync[Int]("removing RDD", sender) {
4949
blockManager.removeRdd(rddId)
5050
}
5151

5252
case RemoveShuffle(shuffleId) =>
53-
doAsync("removing shuffle", sender) {
53+
doAsync[Boolean]("removing shuffle", sender) {
54+
if (mapOutputTracker != null) {
55+
mapOutputTracker.unregisterShuffle(shuffleId)
56+
}
5457
blockManager.shuffleBlockManager.removeShuffle(shuffleId)
5558
}
5659

5760
case RemoveBroadcast(broadcastId, tellMaster) =>
58-
doAsync("removing RDD", sender) {
61+
doAsync[Int]("removing RDD", sender) {
5962
blockManager.removeBroadcast(broadcastId, tellMaster)
6063
}
6164

6265
case GetBlockStatus(blockId, _) =>
6366
sender ! blockManager.getStatus(blockId)
67+
68+
case GetMatchingBlockIds(filter, _) =>
69+
sender ! blockManager.getMatchingBlockIds(filter)
6470
}
6571

6672
private def doAsync[T](actionMessage: String, responseActor: ActorRef)(body: => T) {
@@ -70,7 +76,7 @@ class BlockManagerSlaveActor(
7076
response
7177
}
7278
future.onSuccess { case response =>
73-
logDebug("Successful in " + actionMessage + ", response is " + response)
79+
logDebug("Done " + actionMessage + ", response is " + response)
7480
responseActor ! response
7581
logDebug("Sent response: " + response + " to " + responseActor)
7682
}

core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala

+10
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
4747
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
4848
private var shuffleSender : ShuffleSender = null
4949

50+
5051
addShutdownHook()
5152

5253
/**
@@ -95,6 +96,15 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
9596
getBlockLocation(blockId).file.exists()
9697
}
9798

99+
/** List all the blocks currently stored in disk by the disk manager. */
100+
def getAllBlocks(): Seq[BlockId] = {
101+
// Get all the files inside the array of array of directories
102+
subDirs.flatten.filter(_ != null).flatMap { dir =>
103+
val files = dir.list()
104+
if (files != null) files else Seq.empty
105+
}.map(BlockId.apply)
106+
}
107+
98108
/** Produces a unique block id and File suitable for intermediate results. */
99109
def createTempBlock(): (TempBlockId, File) = {
100110
var blockId = new TempBlockId(UUID.randomUUID())

core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala

+4-1
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,11 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
171171

172172
/** Remove all the blocks / files and metadata related to a particular shuffle. */
173173
def removeShuffle(shuffleId: ShuffleId): Boolean = {
174+
// Do not change the ordering of this, if shuffleStates should be removed only
175+
// after the corresponding shuffle blocks have been removed
176+
val cleaned = removeShuffleBlocks(shuffleId)
174177
shuffleStates.remove(shuffleId)
175-
removeShuffleBlocks(shuffleId)
178+
cleaned
176179
}
177180

178181
/** Remove all the blocks / files related to a particular shuffle. */

core/src/test/scala/org/apache/spark/BroadcastSuite.scala

+6-6
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
125125
// Verify that the broadcast file is created, and blocks are persisted only on the driver
126126
def afterCreation(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) {
127127
assert(blockIds.size === 1)
128-
val statuses = bmm.getBlockStatus(blockIds.head)
128+
val statuses = bmm.getBlockStatus(blockIds.head, askSlaves = true)
129129
assert(statuses.size === 1)
130130
statuses.head match { case (bm, status) =>
131131
assert(bm.executorId === "<driver>", "Block should only be on the driver")
@@ -142,7 +142,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
142142
// Verify that blocks are persisted in both the executors and the driver
143143
def afterUsingBroadcast(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) {
144144
assert(blockIds.size === 1)
145-
val statuses = bmm.getBlockStatus(blockIds.head)
145+
val statuses = bmm.getBlockStatus(blockIds.head, askSlaves = true)
146146
assert(statuses.size === numSlaves + 1)
147147
statuses.foreach { case (_, status) =>
148148
assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK)
@@ -155,7 +155,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
155155
// is true. In the latter case, also verify that the broadcast file is deleted on the driver.
156156
def afterUnpersist(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) {
157157
assert(blockIds.size === 1)
158-
val statuses = bmm.getBlockStatus(blockIds.head)
158+
val statuses = bmm.getBlockStatus(blockIds.head, askSlaves = true)
159159
val expectedNumBlocks = if (removeFromDriver) 0 else 1
160160
val possiblyNot = if (removeFromDriver) "" else " not"
161161
assert(statuses.size === expectedNumBlocks,
@@ -197,7 +197,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
197197
// Verify that blocks are persisted only on the driver
198198
def afterCreation(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) {
199199
blockIds.foreach { blockId =>
200-
val statuses = bmm.getBlockStatus(blockIds.head)
200+
val statuses = bmm.getBlockStatus(blockIds.head, askSlaves = true)
201201
assert(statuses.size === 1)
202202
statuses.head match { case (bm, status) =>
203203
assert(bm.executorId === "<driver>", "Block should only be on the driver")
@@ -211,7 +211,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
211211
// Verify that blocks are persisted in both the executors and the driver
212212
def afterUsingBroadcast(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) {
213213
blockIds.foreach { blockId =>
214-
val statuses = bmm.getBlockStatus(blockId)
214+
val statuses = bmm.getBlockStatus(blockId, askSlaves = true)
215215
if (blockId.field == "meta") {
216216
// Meta data is only on the driver
217217
assert(statuses.size === 1)
@@ -235,7 +235,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
235235
val expectedNumBlocks = if (removeFromDriver) 0 else 1
236236
val possiblyNot = if (removeFromDriver) "" else " not"
237237
blockIds.foreach { blockId =>
238-
val statuses = bmm.getBlockStatus(blockId)
238+
val statuses = bmm.getBlockStatus(blockId, askSlaves = true)
239239
assert(statuses.size === expectedNumBlocks,
240240
"Block should%s be unpersisted on the driver".format(possiblyNot))
241241
}

0 commit comments

Comments
 (0)