Skip to content

[SPARK-15355] [CORE] Proactive block replication #14412

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,34 @@ private[spark] class BlockManager(
}
}

/**
* Called for pro-active replenishment of blocks lost due to executor failures
*
* @param blockId blockId being replicate
* @param existingReplicas existing block managers that have a replica
* @param maxReplicas maximum replicas needed
*/
def replicateBlock(
blockId: BlockId,
existingReplicas: Set[BlockManagerId],
maxReplicas: Int): Unit = {
logInfo(s"Pro-actively replicating $blockId")
blockInfoManager.lockForReading(blockId).foreach { info =>
val data = doGetLocalBytes(blockId, info)
val storageLevel = StorageLevel(
useDisk = info.level.useDisk,
useMemory = info.level.useMemory,
useOffHeap = info.level.useOffHeap,
deserialized = info.level.deserialized,
replication = maxReplicas)
try {
replicate(blockId, data, storageLevel, info.classTag, existingReplicas)
} finally {
releaseLock(blockId)
}
}
}

/**
* Replicate block to another node. Note that this is a blocking call that returns after
* the block has been replicated.
Expand All @@ -1138,7 +1166,8 @@ private[spark] class BlockManager(
blockId: BlockId,
data: ChunkedByteBuffer,
level: StorageLevel,
classTag: ClassTag[_]): Unit = {
classTag: ClassTag[_],
existingReplicas: Set[BlockManagerId] = Set.empty): Unit = {

val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
val tLevel = StorageLevel(
Expand All @@ -1152,20 +1181,22 @@ private[spark] class BlockManager(

val startTime = System.nanoTime

var peersReplicatedTo = mutable.HashSet.empty[BlockManagerId]
var peersReplicatedTo = mutable.HashSet.empty ++ existingReplicas
var peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId]
var numFailures = 0

val initialPeers = getPeers(false).filterNot(existingReplicas.contains(_))

var peersForReplication = blockReplicationPolicy.prioritize(
blockManagerId,
getPeers(false),
mutable.HashSet.empty,
initialPeers,
peersReplicatedTo,
blockId,
numPeersToReplicateTo)

while(numFailures <= maxReplicationFailures &&
!peersForReplication.isEmpty &&
peersReplicatedTo.size != numPeersToReplicateTo) {
!peersForReplication.isEmpty &&
peersReplicatedTo.size < numPeersToReplicateTo) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I think it's still valid to replace the inequality with a strictly-less-than check, but just out of curiosity, can the number of peersReplicatedTo ever exceed numPeersToReplicateTo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One scenario I can think of is if an executor with the block being replicated is lost (due to say a delayed heartbeat) and joins back again. The current implementation would recognize the block manager needs to reregister and will report all blocks. The probability of this happening increases with pro-active replication, I think.

val peer = peersForReplication.head
try {
val onePeerStartTime = System.nanoTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.{HashMap => JHashMap}
import scala.collection.mutable
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Random

import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
Expand Down Expand Up @@ -65,6 +66,8 @@ class BlockManagerMasterEndpoint(
mapper
}

val proactivelyReplicate = conf.get("spark.storage.replication.proactive", "false").toBoolean
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document this new configuration in docs/configuration.md.


logInfo("BlockManagerMasterEndpoint up")

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down Expand Up @@ -195,17 +198,38 @@ class BlockManagerMasterEndpoint(

// Remove it from blockManagerInfo and remove all the blocks.
blockManagerInfo.remove(blockManagerId)

val iterator = info.blocks.keySet.iterator
while (iterator.hasNext) {
val blockId = iterator.next
val locations = blockLocations.get(blockId)
locations -= blockManagerId
// De-register the block if none of the block managers have it. Otherwise, if pro-active
// replication is enabled, and a block is either an RDD or a test block (the latter is used
// for unit testing), we send a message to a randomly chosen executor location to replicate
// the given block. Note that we ignore other block types (such as broadcast/shuffle blocks
// etc.) as replication doesn't make much sense in that context.
if (locations.size == 0) {
blockLocations.remove(blockId)
logWarning(s"No more replicas available for $blockId !")
} else if (proactivelyReplicate && (blockId.isRDD || blockId.isInstanceOf[TestBlockId])) {
// As a heursitic, assume single executor failure to find out the number of replicas that
// existed before failure
val maxReplicas = locations.size + 1
val i = (new Random(blockId.hashCode)).nextInt(locations.size)
val blockLocations = locations.toSeq
val candidateBMId = blockLocations(i)
blockManagerInfo.get(candidateBMId).foreach { bm =>
val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId)
val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
bm.slaveEndpoint.ask[Boolean](replicateMsg)
}
}
}

listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId))
logInfo(s"Removing block manager $blockManagerId")

}

private def removeExecutor(execId: String) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ private[spark] object BlockManagerMessages {
// blocks that the master knows about.
case class RemoveBlock(blockId: BlockId) extends ToBlockManagerSlave

// Replicate blocks that were lost due to executor failure
case class ReplicateBlock(blockId: BlockId, replicas: Seq[BlockManagerId], maxReplicas: Int)
extends ToBlockManagerSlave

// Remove all blocks belonging to a specific RDD.
case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ class BlockManagerSlaveEndpoint(

case TriggerThreadDump =>
context.reply(Utils.getThreadDump())

case ReplicateBlock(blockId, replicas, maxReplicas) =>
context.reply(blockManager.replicateBlock(blockId, replicas.toSet, maxReplicas))

}

private def doAsync[T](actionMessage: String, context: RpcCallContext)(body: => T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,32 +37,31 @@ import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.storage.StorageLevel._

/** Testsuite that tests block replication in BlockManager */
class BlockManagerReplicationSuite extends SparkFunSuite
with Matchers
with BeforeAndAfter
with LocalSparkContext {

private val conf = new SparkConf(false).set("spark.app.id", "test")
private var rpcEnv: RpcEnv = null
private var master: BlockManagerMaster = null
private val securityMgr = new SecurityManager(conf)
private val bcastManager = new BroadcastManager(true, conf, securityMgr)
private val mapOutputTracker = new MapOutputTrackerMaster(conf, bcastManager, true)
private val shuffleManager = new SortShuffleManager(conf)
trait BlockManagerReplicationBehavior extends SparkFunSuite
with Matchers
with BeforeAndAfter
with LocalSparkContext {

val conf: SparkConf
protected var rpcEnv: RpcEnv = null
protected var master: BlockManagerMaster = null
protected lazy val securityMgr = new SecurityManager(conf)
protected lazy val bcastManager = new BroadcastManager(true, conf, securityMgr)
protected lazy val mapOutputTracker = new MapOutputTrackerMaster(conf, bcastManager, true)
protected lazy val shuffleManager = new SortShuffleManager(conf)

// List of block manager created during an unit test, so that all of the them can be stopped
// after the unit test.
private val allStores = new ArrayBuffer[BlockManager]
protected val allStores = new ArrayBuffer[BlockManager]

// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
conf.set("spark.kryoserializer.buffer", "1m")
private val serializer = new KryoSerializer(conf)

protected lazy val serializer = new KryoSerializer(conf)

// Implicitly convert strings to BlockIds for test clarity.
private implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
protected implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)

private def makeBlockManager(
protected def makeBlockManager(
maxMem: Long,
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
conf.set("spark.testing.memory", maxMem.toString)
Expand Down Expand Up @@ -355,7 +354,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite
* is correct. Then it also drops the block from memory of each store (using LRU) and
* again checks whether the master's knowledge gets updated.
*/
private def testReplication(maxReplication: Int, storageLevels: Seq[StorageLevel]) {
protected def testReplication(maxReplication: Int, storageLevels: Seq[StorageLevel]) {
import org.apache.spark.storage.StorageLevel._

assert(maxReplication > 1,
Expand Down Expand Up @@ -448,3 +447,61 @@ class BlockManagerReplicationSuite extends SparkFunSuite
}
}
}

class BlockManagerReplicationSuite extends BlockManagerReplicationBehavior {
val conf = new SparkConf(false).set("spark.app.id", "test")
conf.set("spark.kryoserializer.buffer", "1m")
}

class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehavior {
val conf = new SparkConf(false).set("spark.app.id", "test")
conf.set("spark.kryoserializer.buffer", "1m")
conf.set("spark.storage.replication.proactive", "true")
conf.set("spark.storage.exceptionOnPinLeak", "true")

(2 to 5).foreach{ i =>
test(s"proactive block replication - $i replicas - ${i - 1} block manager deletions") {
testProactiveReplication(i)
}
}

def testProactiveReplication(replicationFactor: Int) {
val blockSize = 1000
val storeSize = 10000
val initialStores = (1 to 10).map { i => makeBlockManager(storeSize, s"store$i") }

val blockId = "a1"

val storageLevel = StorageLevel(true, true, false, true, replicationFactor)
initialStores.head.putSingle(blockId, new Array[Byte](blockSize), storageLevel)

val blockLocations = master.getLocations(blockId)
logInfo(s"Initial locations : $blockLocations")

assert(blockLocations.size === replicationFactor)

// remove a random blockManager
val executorsToRemove = blockLocations.take(replicationFactor - 1)
logInfo(s"Removing $executorsToRemove")
executorsToRemove.foreach{exec =>
master.removeExecutor(exec.executorId)
// giving enough time for replication to happen and new block be reported to master
Thread.sleep(200)
}

// giving enough time for replication complete and locks released
Thread.sleep(500)

val newLocations = master.getLocations(blockId).toSet
logInfo(s"New locations : $newLocations")
assert(newLocations.size === replicationFactor)
// there should only be one common block manager between initial and new locations
assert(newLocations.intersect(blockLocations.toSet).size === 1)

// check if all the read locks have been released
initialStores.filter(bm => newLocations.contains(bm.blockManagerId)).foreach { bm =>
val locks = bm.releaseAllLocksForTask(BlockInfo.NON_TASK_WRITER)
assert(locks.size === 0, "Read locks unreleased!")
}
}
}
9 changes: 9 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,15 @@ Apart from these, the following properties are also available, and may be useful
storage space to unroll the new block in its entirety.
</td>
</tr>
<tr>
<td><code>spark.storage.replication.proactive<code></td>
<td>false</td>
<td>
Enables proactive block replication for RDD blocks. Cached RDD block replicas lost due to
executor failures are replenished if there are any existing available replicas. This tries
to get the replication level of the block to the initial number.
</td>
</tr>
</table>

#### Execution Behavior
Expand Down