Skip to content

Commit 16975b6

Browse files
committed
Incorporating feedback from @sameeragarwal
1 parent 016ea9f commit 16975b6

File tree

2 files changed

+18
-22
lines changed

2 files changed

+18
-22
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1134,11 +1134,14 @@ private[spark] class BlockManager(
11341134
* Called for pro-active replenishment of blocks lost due to executor failures
11351135
*
11361136
* @param blockId blockId being replicate
1137-
* @param replicas existing block managers that have a replica
1138-
* @param maxReps maximum replicas needed
1137+
* @param existingReplicas existing block managers that have a replica
1138+
* @param maxReplicas maximum replicas needed
11391139
* @return
11401140
*/
1141-
def replicateBlock(blockId: BlockId, replicas: Set[BlockManagerId], maxReps: Int): Boolean = {
1141+
def replicateBlock(
1142+
blockId: BlockId,
1143+
existingReplicas: Set[BlockManagerId],
1144+
maxReplicas: Int): Unit = {
11421145
logInfo(s"Pro-actively replicating $blockId")
11431146
val infoForReplication = blockInfoManager.lockForReading(blockId).map { info =>
11441147
val data = doGetLocalBytes(blockId, info)
@@ -1147,24 +1150,17 @@ private[spark] class BlockManager(
11471150
info.level.useMemory,
11481151
info.level.useOffHeap,
11491152
info.level.deserialized,
1150-
maxReps)
1153+
maxReplicas)
11511154
(data, storageLevel, info.classTag)
11521155
}
11531156
infoForReplication.foreach { case (data, storageLevel, classTag) =>
1154-
replicate(blockId, data, storageLevel, classTag, replicas)
1157+
replicate(blockId, data, storageLevel, classTag, existingReplicas)
11551158
}
1156-
true
11571159
}
11581160

11591161
/**
11601162
* Replicate block to another node. Note that this is a blocking call that returns after
11611163
* the block has been replicated.
1162-
*
1163-
* @param blockId
1164-
* @param data
1165-
* @param level
1166-
* @param classTag
1167-
* @param existingReplicas
11681164
*/
11691165
private def replicate(
11701166
blockId: BlockId,
@@ -1189,10 +1185,7 @@ private[spark] class BlockManager(
11891185
var peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId]
11901186
var numFailures = 0
11911187

1192-
val initialPeers = {
1193-
val peers = getPeers(false)
1194-
if(existingReplicas.isEmpty) peers else peers.filter(!existingReplicas.contains(_))
1195-
}
1188+
val initialPeers = getPeers(false).filterNot(existingReplicas.contains(_))
11961189

11971190
var peersForReplication = blockReplicationPolicy.prioritize(
11981191
blockManagerId,

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ class BlockManagerMasterEndpoint(
6666
mapper
6767
}
6868

69+
val proactivelyReplicate = conf.get("spark.storage.replication.proactive", "false").toBoolean
70+
6971
logInfo("BlockManagerMasterEndpoint up")
7072

7173
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -189,13 +191,14 @@ class BlockManagerMasterEndpoint(
189191
}
190192

191193
private def removeBlockManager(blockManagerId: BlockManagerId) {
192-
val proactivelyReplicate = conf.get("spark.storage.replication.proactive", "false").toBoolean
193-
194194
val info = blockManagerInfo(blockManagerId)
195195

196196
// Remove the block manager from blockManagerIdByExecutor.
197197
blockManagerIdByExecutor -= blockManagerId.executorId
198198

199+
// Remove it from blockManagerInfo and remove all the blocks.
200+
blockManagerInfo.remove(blockManagerId)
201+
199202
val iterator = info.blocks.keySet.iterator
200203
while (iterator.hasNext) {
201204
val blockId = iterator.next
@@ -204,8 +207,10 @@ class BlockManagerMasterEndpoint(
204207
if (locations.size == 0) {
205208
blockLocations.remove(blockId)
206209
logWarning(s"No more replicas available for $blockId !")
207-
} else if ((blockId.isRDD || blockId.isInstanceOf[TestBlockId]) && proactivelyReplicate) {
208-
// we only need to proactively replicate RDD blocks
210+
} else if (proactivelyReplicate && (blockId.isRDD || blockId.isInstanceOf[TestBlockId])) {
211+
// only RDD blocks store data that users explicitly cache so we only need to proactively
212+
// replicate RDD blocks
213+
// broadcast related blocks exist on all executors, so we don't worry about them
209214
// we also need to replicate this behavior for test blocks for unit tests
210215
// we send a message to a randomly chosen executor location to replicate block
211216
// assuming single executor failure, we find out how many replicas existed before failure
@@ -222,8 +227,6 @@ class BlockManagerMasterEndpoint(
222227
}
223228
}
224229
}
225-
// Remove it from blockManagerInfo and remove all the blocks.
226-
blockManagerInfo.remove(blockManagerId)
227230

228231
listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId))
229232
logInfo(s"Removing block manager $blockManagerId")

0 commit comments

Comments
 (0)