Skip to content

Commit c5b1d98

Browse files
committed
Address Patrick's comments
1 parent a6460d4 commit c5b1d98

10 files changed

+70
-57
lines changed

core/src/main/scala/org/apache/spark/ContextCleaner.scala

-3
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
8080
/** Stop the cleaner. */
8181
def stop() {
8282
stopped = true
83-
cleaningThread.interrupt()
8483
}
8584

8685
/** Register a RDD for cleanup when it is garbage collected. */
@@ -119,8 +118,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
119118
}
120119
}
121120
} catch {
122-
case ie: InterruptedException =>
123-
if (!stopped) logWarning("Cleaning thread interrupted")
124121
case t: Throwable => logError("Error in cleaning thread", t)
125122
}
126123
}

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

+17-13
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,18 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
7171
* (driver and worker) use different HashMap to store its metadata.
7272
*/
7373
private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging {
74-
7574
private val timeout = AkkaUtils.askTimeout(conf)
7675

77-
/** Set to the MapOutputTrackerActor living on the driver */
76+
/** Set to the MapOutputTrackerActor living on the driver. */
7877
var trackerActor: ActorRef = _
7978

80-
/** This HashMap needs to have different storage behavior for driver and worker */
79+
/**
80+
* This HashMap has different behavior for the master and the workers.
81+
*
82+
* On the master, it serves as the source of map outputs recorded from ShuffleMapTasks.
83+
* On the workers, it simply serves as a cache, in which a miss triggers a fetch from the
84+
* master's corresponding HashMap.
85+
*/
8186
protected val mapStatuses: Map[Int, Array[MapStatus]]
8287

8388
/**
@@ -87,7 +92,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
8792
protected var epoch: Long = 0
8893
protected val epochLock = new AnyRef
8994

90-
/** Remembers which map output locations are currently being fetched on a worker */
95+
/** Remembers which map output locations are currently being fetched on a worker. */
9196
private val fetching = new HashSet[Int]
9297

9398
/**
@@ -173,7 +178,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
173178
}
174179
}
175180

176-
/** Called to get current epoch number */
181+
/** Called to get current epoch number. */
177182
def getEpoch: Long = {
178183
epochLock.synchronized {
179184
return epoch
@@ -195,16 +200,13 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
195200
}
196201
}
197202

198-
/** Unregister shuffle data */
203+
/** Unregister shuffle data. */
199204
def unregisterShuffle(shuffleId: Int) {
200205
mapStatuses.remove(shuffleId)
201206
}
202207

203-
def stop() {
204-
sendTracker(StopMapOutputTracker)
205-
mapStatuses.clear()
206-
trackerActor = null
207-
}
208+
/** Stop the tracker. */
209+
def stop() { }
208210
}
209211

210212
/**
@@ -219,7 +221,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
219221

220222
/**
221223
* Timestamp based HashMap for storing mapStatuses and cached serialized statuses in the master,
222-
* so that statuses are dropped only by explicit deregistering or by TTL-based cleaning (if set).
224+
* so that statuses are dropped only by explicit de-registering or by TTL-based cleaning (if set).
223225
* Other than these two scenarios, nothing should be dropped from this HashMap.
224226
*/
225227
protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]()
@@ -314,7 +316,9 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
314316
}
315317

316318
override def stop() {
317-
super.stop()
319+
sendTracker(StopMapOutputTracker)
320+
mapStatuses.clear()
321+
trackerActor = null
318322
metadataCleaner.cancel()
319323
cachedSerializedStatuses.clear()
320324
}

core/src/main/scala/org/apache/spark/SparkContext.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHad
3535
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
3636
import org.apache.mesos.MesosNativeLibrary
3737

38+
import org.apache.spark.broadcast.Broadcast
3839
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
3940
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
4041
import org.apache.spark.rdd._
@@ -643,7 +644,7 @@ class SparkContext(
643644
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
644645
* The variable will be sent to each cluster only once.
645646
*/
646-
def broadcast[T](value: T) = {
647+
def broadcast[T](value: T): Broadcast[T] = {
647648
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
648649
cleaner.registerBroadcastForCleanup(bc)
649650
bc

core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala

+10-5
Original file line numberDiff line numberDiff line change
@@ -62,16 +62,21 @@ abstract class Broadcast[T](val id: Long) extends Serializable {
6262
def value: T
6363

6464
/**
65-
* Remove all persisted state associated with this broadcast on the executors. The next use
66-
* of this broadcast on the executors will trigger a remote fetch.
65+
* Delete cached copies of this broadcast on the executors. If the broadcast is used after
66+
* this is called, it will need to be re-sent to each executor.
6767
*/
6868
def unpersist()
6969

7070
/**
71-
* Remove all persisted state associated with this broadcast on both the executors and the
72-
* driver. Overriding implementations should set isValid to false.
71+
* Remove all persisted state associated with this broadcast on both the executors and
72+
* the driver.
7373
*/
74-
private[spark] def destroy()
74+
private[spark] def destroy() {
75+
_isValid = false
76+
onDestroy()
77+
}
78+
79+
protected def onDestroy()
7580

7681
/**
7782
* If this broadcast is no longer valid, throw an exception.

core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala

+2-10
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,7 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea
5454
HttpBroadcast.unpersist(id, removeFromDriver = false)
5555
}
5656

57-
/**
58-
* Remove all persisted state associated with this HTTP Broadcast on both the executors
59-
* and the driver.
60-
*/
61-
private[spark] def destroy() {
62-
_isValid = false
57+
protected def onDestroy() {
6358
HttpBroadcast.unpersist(id, removeFromDriver = true)
6459
}
6560

@@ -91,7 +86,6 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea
9186

9287
private[spark] object HttpBroadcast extends Logging {
9388
private var initialized = false
94-
9589
private var broadcastDir: File = null
9690
private var compress: Boolean = false
9791
private var bufferSize: Int = 65536
@@ -101,11 +95,9 @@ private[spark] object HttpBroadcast extends Logging {
10195

10296
// TODO: This shouldn't be a global variable so that multiple SparkContexts can coexist
10397
private val files = new TimeStampedHashSet[String]
104-
private var cleaner: MetadataCleaner = null
105-
10698
private val httpReadTimeout = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES).toInt
107-
10899
private var compressionCodec: CompressionCodec = null
100+
private var cleaner: MetadataCleaner = null
109101

110102
def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
111103
synchronized {

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

+1-6
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,7 @@ private[spark] class TorrentBroadcast[T](@transient var value_ : T, isLocal: Boo
5757
TorrentBroadcast.unpersist(id, removeFromDriver = false)
5858
}
5959

60-
/**
61-
* Remove all persisted state associated with this Torrent broadcast on both the executors
62-
* and the driver.
63-
*/
64-
private[spark] def destroy() {
65-
_isValid = false
60+
protected def onDestroy() {
6661
TorrentBroadcast.unpersist(id, removeFromDriver = true)
6762
}
6863

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -829,12 +829,12 @@ private[spark] class BlockManager(
829829
/**
830830
* Remove all blocks belonging to the given broadcast.
831831
*/
832-
def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean) {
832+
def removeBroadcast(broadcastId: Long, tellMaster: Boolean) {
833833
logInfo("Removing broadcast " + broadcastId)
834834
val blocksToRemove = blockInfo.keys.collect {
835835
case bid @ BroadcastBlockId(`broadcastId`, _) => bid
836836
}
837-
blocksToRemove.foreach { blockId => removeBlock(blockId, removeFromDriver) }
837+
blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster) }
838838
}
839839

840840
/**

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

+5-4
Original file line numberDiff line numberDiff line change
@@ -109,20 +109,20 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
109109
/** Remove all blocks belonging to the given RDD. */
110110
def removeRdd(rddId: Int, blocking: Boolean) {
111111
val future = askDriverWithReply[Future[Seq[Int]]](RemoveRdd(rddId))
112-
future onFailure {
112+
future.onFailure {
113113
case e: Throwable => logError("Failed to remove RDD " + rddId, e)
114114
}
115115
if (blocking) {
116116
Await.result(future, timeout)
117117
}
118118
}
119119

120-
/** Remove all blocks belonging to the given shuffle. */
120+
/** Remove all blocks belonging to the given shuffle asynchronously. */
121121
def removeShuffle(shuffleId: Int) {
122122
askDriverWithReply(RemoveShuffle(shuffleId))
123123
}
124124

125-
/** Remove all blocks belonging to the given broadcast. */
125+
/** Remove all blocks belonging to the given broadcast asynchronously. */
126126
def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean) {
127127
askDriverWithReply(RemoveBroadcast(broadcastId, removeFromMaster))
128128
}
@@ -142,7 +142,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
142142
}
143143

144144
/**
145-
* Return the block's status on all block managers, if any.
145+
* Return the block's status on all block managers, if any. This can potentially be an
146+
* expensive operation and is used mainly for testing.
146147
*
147148
* If askSlaves is true, this invokes the master to query each block manager for the most
148149
* updated block statuses. This is useful when the master is not informed of the given block

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
168168
*/
169169
private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean) {
170170
// TODO: Consolidate usages of <driver>
171-
val removeMsg = RemoveBroadcast(broadcastId)
171+
val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver)
172172
blockManagerInfo.values
173173
.filter { info => removeFromDriver || info.blockManagerId.executorId != "<driver>" }
174174
.foreach { bm => bm.slaveActor ! removeMsg }
@@ -255,7 +255,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
255255
}
256256

257257
/**
258-
* Return the block's status for all block managers, if any.
258+
* Return the block's status for all block managers, if any. This can potentially be an
259+
* expensive operation and is used mainly for testing.
259260
*
260261
* If askSlaves is true, the master queries each block manager for the most updated block
261262
* statuses. This is useful when the master is not informed of the given block by all block

core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala

+28-11
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717

1818
package org.apache.spark.storage
1919

20+
import scala.concurrent.Future
21+
2022
import akka.actor.Actor
2123

22-
import org.apache.spark.MapOutputTracker
24+
import org.apache.spark.{Logging, MapOutputTracker}
2325
import org.apache.spark.storage.BlockManagerMessages._
2426

2527
/**
@@ -30,25 +32,40 @@ private[storage]
3032
class BlockManagerSlaveActor(
3133
blockManager: BlockManager,
3234
mapOutputTracker: MapOutputTracker)
33-
extends Actor {
35+
extends Actor with Logging {
3436

35-
override def receive = {
37+
import context.dispatcher
3638

39+
// Operations that involve removing blocks may be slow and should be done asynchronously
40+
override def receive = {
3741
case RemoveBlock(blockId) =>
38-
blockManager.removeBlock(blockId)
42+
val removeBlock = Future { blockManager.removeBlock(blockId) }
43+
removeBlock.onFailure { case t: Throwable =>
44+
logError("Error in removing block " + blockId, t)
45+
}
3946

4047
case RemoveRdd(rddId) =>
41-
val numBlocksRemoved = blockManager.removeRdd(rddId)
42-
sender ! numBlocksRemoved
48+
val removeRdd = Future { sender ! blockManager.removeRdd(rddId) }
49+
removeRdd.onFailure { case t: Throwable =>
50+
logError("Error in removing RDD " + rddId, t)
51+
}
4352

4453
case RemoveShuffle(shuffleId) =>
45-
blockManager.shuffleBlockManager.removeShuffle(shuffleId)
46-
if (mapOutputTracker != null) {
47-
mapOutputTracker.unregisterShuffle(shuffleId)
54+
val removeShuffle = Future {
55+
blockManager.shuffleBlockManager.removeShuffle(shuffleId)
56+
if (mapOutputTracker != null) {
57+
mapOutputTracker.unregisterShuffle(shuffleId)
58+
}
59+
}
60+
removeShuffle.onFailure { case t: Throwable =>
61+
logError("Error in removing shuffle " + shuffleId, t)
4862
}
4963

50-
case RemoveBroadcast(broadcastId, removeFromDriver) =>
51-
blockManager.removeBroadcast(broadcastId, removeFromDriver)
64+
case RemoveBroadcast(broadcastId, tellMaster) =>
65+
val removeBroadcast = Future { blockManager.removeBroadcast(broadcastId, tellMaster) }
66+
removeBroadcast.onFailure { case t: Throwable =>
67+
logError("Error in removing broadcast " + broadcastId, t)
68+
}
5269

5370
case GetBlockStatus(blockId, _) =>
5471
sender ! blockManager.getStatus(blockId)

0 commit comments

Comments
 (0)