Skip to content

[SPARK-15355] [CORE] Proactive block replication #14412

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

shubhamchopra
Copy link
Contributor

What changes were proposed in this pull request?

We are proposing addition of pro-active block replication in case of executor failures. BlockManagerMasterEndpoint does all the book-keeping to keep a track of all the executors and the blocks they hold. It also keeps a track of which executors are alive through heartbeats. When an executor is removed, all this book-keeping state is updated to reflect the lost executor. This step can be used to identify executors that are still in possession of a copy of the cached data and a message could be sent to them to use the existing "replicate" function to find and place new replicas on other suitable hosts. Blocks replicated this way will let the master know of their existence.

This can happen when an executor is lost, and would that way be pro-active as opposed be being done at query time.

How was this patch tested?

This patch was tested with existing unit tests along with new unit tests added to test the functionality.

@@ -37,10 +37,11 @@ import org.apache.spark.util.Utils
class BlockManagerId private (
private var executorId_ : String,
private var host_ : String,
private var port_ : Int)
private var port_ : Int,
private var topologyInfo_ : Option[String])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to document what information this parameter contains (and in what format).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added documentation about the parameter in the constructor.

@shubhamchopra shubhamchopra force-pushed the ProactiveBlockReplication branch from 30edb1e to 3cf2978 Compare August 9, 2016 15:59
@shubhamchopra shubhamchopra force-pushed the ProactiveBlockReplication branch from 3cf2978 to 016ea9f Compare November 2, 2016 16:14
@shubhamchopra shubhamchopra changed the title [SPARK-15355] [CORE] [WIP] Proactive block replication [SPARK-15355] [CORE] Proactive block replication Jan 23, 2017
@sameeragarwal
Copy link
Member

jenkins ok to test

@SparkQA
Copy link

SparkQA commented Jan 31, 2017

Test build #72187 has started for PR 14412 at commit 016ea9f.

Copy link
Member

@sameeragarwal sameeragarwal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @shubhamchopra, I left a few comments and questions but overall this approach looks quite reasonable to me.

* @param maxReps maximum replicas needed
* @return
*/
def replicateBlock(blockId: BlockId, replicas: Set[BlockManagerId], maxReps: Int): Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about something like this for better readability?

def replicateBlock(blockId: BlockId, existingReplicas: Set[BlockManagerId], maxReplicas: Int)

Also, is there a reason this returns a Boolean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't need to return a boolean. Changing the return type to Unit. Also changing the variable names.

* @param data
* @param level
* @param classTag
* @param existingReplicas
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's either document these params or just remove them

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing these.

blockId: BlockId,
data: ChunkedByteBuffer,
level: StorageLevel,
classTag: ClassTag[_]): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this still needs fixing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sameeragarwal This code is being removed as a part of this PR. Code replacing this has this fixed.

var peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId]
var numFailures = 0

val initialPeers = {
Copy link
Member

@sameeragarwal sameeragarwal Jan 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this not be just:

val initialPeers = getPeers(false).filterNot(existingReplicas.contains(_))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

!peersForReplication.isEmpty &&
peersReplicatedTo.size != numPeersToReplicateTo) {
!peersForReplication.isEmpty &&
peersReplicatedTo.size < numPeersToReplicateTo) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I think it's still valid to replace the inequality with a strictly-less-than check, but just out of curiosity, can the number of peersReplicatedTo ever exceed numPeersToReplicateTo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One scenario I can think of is if an executor with the block being replicated is lost (due to say a delayed heartbeat) and joins back again. The current implementation would recognize the block manager needs to reregister and will report all blocks. The probability of this happening increases with pro-active replication, I think.

@@ -188,24 +189,45 @@ class BlockManagerMasterEndpoint(
}

private def removeBlockManager(blockManagerId: BlockManagerId) {
val proactivelyReplicate = conf.get("spark.storage.replication.proactive", "false").toBoolean
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can probably move this out of the function block

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed from the function.

val info = blockManagerInfo(blockManagerId)

// Remove the block manager from blockManagerIdByExecutor.
blockManagerIdByExecutor -= blockManagerId.executorId

// Remove it from blockManagerInfo and remove all the blocks.
blockManagerInfo.remove(blockManagerId)
Copy link
Member

@sameeragarwal sameeragarwal Jan 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you move this at the end (i.e., after replicating the blocks and updating blockLocations)?

val iterator = info.blocks.keySet.iterator
while (iterator.hasNext) {
val blockId = iterator.next
val locations = blockLocations.get(blockId)
locations -= blockManagerId
if (locations.size == 0) {
blockLocations.remove(blockId)
logWarning(s"No more replicas available for $blockId !")
} else if ((blockId.isRDD || blockId.isInstanceOf[TestBlockId]) && proactivelyReplicate) {
// we only need to proactively replicate RDD blocks
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense overall but it'd be great to add some more comments about why are we only concerned with RDD blocks and not others.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, might be nicer to make proactivelyReplicate the first check for better short-circuiting.

// we also need to replicate this behavior for test blocks for unit tests
// we send a message to a randomly chosen executor location to replicate block
// assuming single executor failure, we find out how many replicas existed before failure
val maxReplicas = locations.size + 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if multiple executors are removed simultaneously? Depending on the invocation sequence, is it possible for maxReplicas to be significantly less than the original number of replicas?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's a tough one. So the way replication is implemented, the correct storage level is only available with one of the blocks at BlockManager layer (we don't have access to RDD that this block is a part of, so we can't extract information from there). The remaining blocks all have storage levels set to 1. So I use the locations size to get an approximation for the storage level.

@SparkQA
Copy link

SparkQA commented Feb 3, 2017

Test build #72291 has finished for PR 14412 at commit 16975b6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@sameeragarwal sameeragarwal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, modulo some minor nits. @JoshRosen can you please take a look too?

blockId: BlockId,
data: ChunkedByteBuffer,
level: StorageLevel,
classTag: ClassTag[_]): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this still needs fixing

val iterator = info.blocks.keySet.iterator
while (iterator.hasNext) {
val blockId = iterator.next
val locations = blockLocations.get(blockId)
locations -= blockManagerId
if (locations.size == 0) {
blockLocations.remove(blockId)
logWarning(s"No more replicas available for $blockId !")
} else if (proactivelyReplicate && (blockId.isRDD || blockId.isInstanceOf[TestBlockId])) {
// only RDD blocks store data that users explicitly cache so we only need to proactively
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a nit, but how about we split this comment into two separate ones for better readability around the two separate set of issues. My suggestion:

      // De-register the block if none of the block managers have it. Otherwise, if pro-active
      // replication is enabled, and a block is either an RDD or a test block (the latter is used
      // for unit testing), we send a message to a randomly chosen executor location to replicate
      // the given block. Note that we ignore other block types (such as broadcast/shuffle blocks
      // etc.) as replication doesn't make much sense in that context.
      if (locations.size == 0) {
        blockLocations.remove(blockId)
      } else if (proactivelyReplicate && (blockId.isRDD || blockId.isInstanceOf[TestBlockId])) {
      // As a heursitic, assume single executor failure to find out the number of replicas that
      // existed before failure
      val maxReplicas = locations.size + 1

      val i = (new Random(blockId.hashCode)).nextInt(locations.size)
      val blockLocations = locations.toSeq
      val candidateBMId = blockLocations(i)
      val blockManager = blockManagerInfo.get(candidateBMId)
      if(blockManager.isDefined) {
        val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId)
        val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
        blockManager.get.slaveEndpoint.ask[Boolean](replicateMsg)
      }
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on Sameer's suggestions. This code is a little subtle and benefits from a clearer comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with the suggestion. Modifying it. Thanks!

Copy link
Contributor

@JoshRosen JoshRosen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just took a review pass. I have some concerns about the release of the readLock which is acquired in the replication code. Unless I'm overlooking something, I think it's not currently released following the proactive replication.

I also left several small comments on how I think the code could be slightly simplified. I'll also echo Sameer's review feedback regarding minor code style issues and clarification of some of the comments.

* @return
*/
def replicateBlock(
blockId: BlockId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as Sameer's comment elsewhere in the code, we should fix the indentation here.

* @param blockId blockId being replicate
* @param existingReplicas existing block managers that have a replica
* @param maxReplicas maximum replicas needed
* @return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can omit this @return since this method doesn't have a return value.

val blockLocations = locations.toSeq
val candidateBMId = blockLocations(i)
val blockManager = blockManagerInfo.get(candidateBMId)
if(blockManager.isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: space after if.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're not going to have an else branch here then you might as well just forEach over the result of blockManagerInfo.get(candidateBMId).

val infoForReplication = blockInfoManager.lockForReading(blockId).map { info =>
val data = doGetLocalBytes(blockId, info)
val storageLevel = StorageLevel(
info.level.useDisk,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit, but a problem with the StorageLevel constructor is that it has a bunch of adjacent boolean parameters, so in such cases I'd usually prefer to name all of the parameters explicitly at the call site in order to avoid errors should these lines get permuted / to convince readers that the API is being used correctly.

Thus I'd probably write each line like useDisk = info.level.useDisk, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

existingReplicas: Set[BlockManagerId],
maxReplicas: Int): Unit = {
logInfo(s"Pro-actively replicating $blockId")
val infoForReplication = blockInfoManager.lockForReading(blockId).map { info =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call acquires a read lock on the block, but when is that lock released? Per the Scaladoc of doGetLocalBytes, you need to be holding a read lock before calling that method, but upon successful return from that method the read lock will still be held by the caller.

I think what you want to do is acquire the lock, immediately call doGetLocalBytes, then begin a try-finally statement to call replicate() and unlock / release the lock in the finally block.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I don't think there's a need to have separate .map and .foreach calls over the option. Instead, I think it would be clearer to avoid the assignment to the infoForReplication variable and just perform all of the work inside of a .foreach call on the Option with the block info.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that makes sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! Can we also assert that all locks are released somewhere in testProactiveReplication?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we can set spark.storage.exceptionOnPinLeak to true in SparkConf to do this.

val iterator = info.blocks.keySet.iterator
while (iterator.hasNext) {
val blockId = iterator.next
val locations = blockLocations.get(blockId)
locations -= blockManagerId
if (locations.size == 0) {
blockLocations.remove(blockId)
logWarning(s"No more replicas available for $blockId !")
} else if (proactivelyReplicate && (blockId.isRDD || blockId.isInstanceOf[TestBlockId])) {
// only RDD blocks store data that users explicitly cache so we only need to proactively
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on Sameer's suggestions. This code is a little subtle and benefits from a clearer comment.

// assuming single executor failure, we find out how many replicas existed before failure
val maxReplicas = locations.size + 1

val i = (new Random(blockId.hashCode)).nextInt(locations.size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to use a fixed random seed here? Testing?

Also, isn't there a Random.choice() that you can use for this? Or a method like that in our own Utils class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala Random api doesn't have a choice method. And Spark Utils class has methods to shuffle, but not a random choice.

val candidateBMId = blockLocations(i)
val blockManager = blockManagerInfo.get(candidateBMId)
if(blockManager.isDefined) {
val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for this list to be empty in certain corner-cases? What happens if ReplicateBlock is called with an empty set of locations? Is it just a no-op in that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are at this point, there would be atleast one location with the block which will get chosen as the candidate here. remainingLocations just tells the replication logic where other replicas are present (if any, so it can be an empty set), so it can use that info while choosing candidate executors for replication.

@@ -65,6 +66,8 @@ class BlockManagerMasterEndpoint(
mapper
}

val proactivelyReplicate = conf.get("spark.storage.replication.proactive", "false").toBoolean
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document this new configuration in docs/configuration.md.

@SparkQA
Copy link

SparkQA commented Feb 18, 2017

Test build #73074 has finished for PR 14412 at commit beb9eb3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 18, 2017

Test build #73075 has finished for PR 14412 at commit 275cbea.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@JoshRosen JoshRosen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for incorporating my feedback. LGTM.

@shubhamchopra
Copy link
Contributor Author

"spark.storage.exceptionOnPinLeak" based check only works if executors are created. I put in an assertion check using logic similar to it in the testProactiveReplication tests.

@SparkQA
Copy link

SparkQA commented Feb 23, 2017

Test build #73364 has finished for PR 14412 at commit cee8e76.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 24, 2017

Test build #73431 has finished for PR 14412 at commit 212baab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@sameeragarwal sameeragarwal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, thanks!

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in fa7c582 Feb 24, 2017
Yunni pushed a commit to Yunni/spark that referenced this pull request Feb 27, 2017
## What changes were proposed in this pull request?

We are proposing addition of pro-active block replication in case of executor failures. BlockManagerMasterEndpoint does all the book-keeping to keep a track of all the executors and the blocks they hold. It also keeps a track of which executors are alive through heartbeats. When an executor is removed, all this book-keeping state is updated to reflect the lost executor. This step can be used to identify executors that are still in possession of a copy of the cached data and a message could be sent to them to use the existing "replicate" function to find and place new replicas on other suitable hosts. Blocks replicated this way will let the master know of their existence.

This can happen when an executor is lost, and would that way be pro-active as opposed be being done at query time.
## How was this patch tested?

This patch was tested with existing unit tests along with new unit tests added to test the functionality.

Author: Shubham Chopra <schopra31@bloomberg.net>

Closes apache#14412 from shubhamchopra/ProactiveBlockReplication.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants