Skip to content

Commit beb9eb3

Browse files
committed
Incorporating suggestions from @JoshRosen and @sameeragarwal
1 parent 16975b6 commit beb9eb3

File tree

3 files changed

+37
-29
lines changed

3 files changed

+37
-29
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1136,25 +1136,25 @@ private[spark] class BlockManager(
11361136
* @param blockId blockId being replicate
11371137
* @param existingReplicas existing block managers that have a replica
11381138
* @param maxReplicas maximum replicas needed
1139-
* @return
11401139
*/
11411140
def replicateBlock(
1142-
blockId: BlockId,
1143-
existingReplicas: Set[BlockManagerId],
1144-
maxReplicas: Int): Unit = {
1141+
blockId: BlockId,
1142+
existingReplicas: Set[BlockManagerId],
1143+
maxReplicas: Int): Unit = {
11451144
logInfo(s"Pro-actively replicating $blockId")
1146-
val infoForReplication = blockInfoManager.lockForReading(blockId).map { info =>
1145+
val blockInfo = blockInfoManager.lockForReading(blockId).foreach { info =>
11471146
val data = doGetLocalBytes(blockId, info)
11481147
val storageLevel = StorageLevel(
1149-
info.level.useDisk,
1150-
info.level.useMemory,
1151-
info.level.useOffHeap,
1152-
info.level.deserialized,
1153-
maxReplicas)
1154-
(data, storageLevel, info.classTag)
1155-
}
1156-
infoForReplication.foreach { case (data, storageLevel, classTag) =>
1157-
replicate(blockId, data, storageLevel, classTag, existingReplicas)
1148+
useDisk = info.level.useDisk,
1149+
useMemory = info.level.useMemory,
1150+
useOffHeap = info.level.useOffHeap,
1151+
deserialized = info.level.deserialized,
1152+
replication = maxReplicas)
1153+
try {
1154+
replicate(blockId, data, storageLevel, info.classTag, existingReplicas)
1155+
} finally {
1156+
releaseLock(blockId)
1157+
}
11581158
}
11591159
}
11601160

@@ -1163,11 +1163,11 @@ private[spark] class BlockManager(
11631163
* the block has been replicated.
11641164
*/
11651165
private def replicate(
1166-
blockId: BlockId,
1167-
data: ChunkedByteBuffer,
1168-
level: StorageLevel,
1169-
classTag: ClassTag[_],
1170-
existingReplicas: Set[BlockManagerId] = Set.empty): Unit = {
1166+
blockId: BlockId,
1167+
data: ChunkedByteBuffer,
1168+
level: StorageLevel,
1169+
classTag: ClassTag[_],
1170+
existingReplicas: Set[BlockManagerId] = Set.empty): Unit = {
11711171

11721172
val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
11731173
val tLevel = StorageLevel(

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -204,26 +204,25 @@ class BlockManagerMasterEndpoint(
204204
val blockId = iterator.next
205205
val locations = blockLocations.get(blockId)
206206
locations -= blockManagerId
207+
// De-register the block if none of the block managers have it. Otherwise, if pro-active
208+
// replication is enabled, and a block is either an RDD or a test block (the latter is used
209+
// for unit testing), we send a message to a randomly chosen executor location to replicate
210+
// the given block. Note that we ignore other block types (such as broadcast/shuffle blocks
211+
// etc.) as replication doesn't make much sense in that context.
207212
if (locations.size == 0) {
208213
blockLocations.remove(blockId)
209214
logWarning(s"No more replicas available for $blockId !")
210215
} else if (proactivelyReplicate && (blockId.isRDD || blockId.isInstanceOf[TestBlockId])) {
211-
// only RDD blocks store data that users explicitly cache so we only need to proactively
212-
// replicate RDD blocks
213-
// broadcast related blocks exist on all executors, so we don't worry about them
214-
// we also need to replicate this behavior for test blocks for unit tests
215-
// we send a message to a randomly chosen executor location to replicate block
216-
// assuming single executor failure, we find out how many replicas existed before failure
216+
// As a heursitic, assume single executor failure to find out the number of replicas that
217+
// existed before failure
217218
val maxReplicas = locations.size + 1
218-
219219
val i = (new Random(blockId.hashCode)).nextInt(locations.size)
220220
val blockLocations = locations.toSeq
221221
val candidateBMId = blockLocations(i)
222-
val blockManager = blockManagerInfo.get(candidateBMId)
223-
if(blockManager.isDefined) {
222+
blockManagerInfo.get(candidateBMId).foreach { bm =>
224223
val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId)
225224
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
226-
blockManager.get.slaveEndpoint.ask[Boolean](replicateMsg)
225+
bm.slaveEndpoint.ask[Boolean](replicateMsg)
227226
}
228227
}
229228
}

docs/configuration.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -952,6 +952,15 @@ Apart from these, the following properties are also available, and may be useful
952952
storage space to unroll the new block in its entirety.
953953
</td>
954954
</tr>
955+
<tr>
956+
<td><code>spark.storage.replication.proactive<code></td>
957+
<td>false</td>
958+
<td>
959+
Enables proactive block replication for RDD blocks. Cached RDD block replicas lost due to
960+
executor failures are replenished if there are any existing available replicas. This tries
961+
to get the replication level of the block to the initial number.
962+
</td>
963+
</tr>
955964
</table>
956965

957966
#### Execution Behavior

0 commit comments

Comments
 (0)