Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 201 additions & 0 deletions core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.Failure

import com.google.common.cache.CacheBuilder

import org.apache.spark.CacheRecoveryManager.{DoneRecovering, KillReason, Timeout}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.ThreadUtils

/**
* Responsible for asynchronously replicating all of an executor's cached blocks, and then shutting
* it down.
*/
private class CacheRecoveryManager(
blockManagerMasterEndpoint: RpcEndpointRef,
executorAllocationManager: ExecutorAllocationManager,
conf: SparkConf)
extends Logging {

private val forceKillAfterS = conf.get(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT)
private val threadPool = ThreadUtils.newDaemonCachedThreadPool("cache-recovery-manager-pool")
private implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(threadPool)
private val scheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("cache-recovery-shutdown-timers")
private val recoveringExecutors = CacheBuilder.newBuilder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find recoveringExecutors pretty confusing, I think its executors that are recovering from some problem, but are going to be OK -- not executors that are about to die, which we are recovering data from. how about drainingExecutors? (though I have a feeling this name may have been discussed in earlier rounds of comments and this is what we settled on ... if so, thats fine.)

.expireAfterWrite(forceKillAfterS * 2, TimeUnit.SECONDS)
.build[String, String]()

/**
* Start the recover cache shutdown process for these executors
*
* @param execIds the executors to start shutting down
* @return a sequence of futures of Unit that will complete once the executor has been killed.
*/
def startCacheRecovery(execIds: Seq[String]): Future[Seq[KillReason]] = {
logDebug(s"Recover cached data before shutting down executors ${execIds.mkString(", ")}.")
val canBeRecovered: Future[Seq[String]] = checkMem(execIds)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't you immediately kill those executors which dont' pass checkMem? are they getting killed somewhere else?


canBeRecovered.flatMap { execIds =>
execIds.foreach { execId => recoveringExecutors.put(execId, execId) }
Future.sequence(execIds.map { replicateUntilTimeoutThenKill })
}
}

def replicateUntilTimeoutThenKill(execId: String): Future[KillReason] = {
val timeoutFuture = returnAfterTimeout(Timeout, forceKillAfterS)
val replicationFuture = replicateUntilDone(execId)

Future.firstCompletedOf(List(timeoutFuture, replicationFuture)).andThen {
Copy link
Contributor

@squito squito Apr 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont' think this will do what you want. Try this code (if you do it in the scala repl, be sure to use -Yrepl-class-based : https://stackoverflow.com/a/23111645/1442961)

import scala.concurrent._
import scala.concurrent.duration._
import java.util.concurrent.{TimeUnit, Executors, ExecutorService}

val scheduler = Executors.newSingleThreadScheduledExecutor()
val pool = Executors.newSingleThreadExecutor()
implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(pool)

def returnAfterTimeout[T](value: T, seconds: Long): Future[T] = {
    val p = Promise[T]()
    val runnable = new Runnable {
      def run(): Unit = { println("time's up"); p.success(value) }
    }
    scheduler.schedule(runnable, seconds, TimeUnit.SECONDS)
    p.future
  }

def printStuff(x: Int): String = {
  (0 until x).foreach{ i => println(i); Thread.sleep(1000)}
  "done"
} 

// The *value* of the future is correct here, but you'll notice the timer keeps going, we still see "time's up".
// Not so bad in this case ...
Future.firstCompletedOf(Seq(Future(printStuff(2)), returnAfterTimeout("timeout", 5)))

// The final *value* of the future is correct again here -- we get the timeout.  But (a) you'll see that printStuff keeps
// running anyway and (b) the future isn't actually ready until printStuff completes, even though it should be ready
// as soon as the timer is up
Future.firstCompletedOf(Seq(Future(printStuff(10)), returnAfterTimeout("timeout", 1)))

// Slightly better, the TimeoutException is thrown as soon as the timer is up, but printStuff keeps running anyway
Await.result(Future(printStuff(10)), 1 second)

So I'd change this to use ThreadUtils.awaitResult and also you need your timer to set some condition which the recovery thread is checking, so it knows to stop trying to replicate more blocks.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so my original thought was that whatever future finished first, the replication future or the timeout future, the other would complete harmlessly in the background. I didn't realize that the future returned by firstCompletedOf wouldn't complete until both were done. I will fix this. Thanks

case scala.util.Success(DoneRecovering) =>
logTrace(s"Done recovering blocks on $execId, killing now")
case scala.util.Success(Timeout) =>
logWarning(s"Couldn't recover cache on $execId before $forceKillAfterS second timeout")
case Failure(ex) =>
logWarning(s"Error recovering cache on $execId", ex)
}.andThen {
case _ =>
kill(execId)
}
}

/**
* Given a list of executors that will be shut down, check if there is enough free memory on the
* rest of the cluster to hold their data. Return a list of just the executors for which there
* will be enough space. Executors are included smallest first.
*
* This is a best guess implementation and it is not guaranteed that all returned executors
* will succeed. For example a block might be too big to fit on any one specific executor.
*
* @param execIds executors which will be shut down
* @return a Seq of the executors we do have room for
*/
private def checkMem(execIds: Seq[String]): Future[Seq[String]] = {
val execsToShutDown = execIds.toSet
// Memory Status is a map of executor Id to a tuple of Max Memory and remaining memory on that
// executor.
val futureMemStatusByBlockManager =
blockManagerMasterEndpoint.ask[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)

val futureMemStatusByExecutor = futureMemStatusByBlockManager.map { memStat =>
memStat.map { case (blockManagerId, mem) => blockManagerId.executorId -> mem }
}

futureMemStatusByExecutor.map { memStatusByExecutor =>
val (expiringMemStatus, remainingMemStatus) = memStatusByExecutor.partition {
case (execId, _) => execsToShutDown.contains(execId)
}
val freeMemOnRemaining = remainingMemStatus.values.map(_._2).sum

// The used mem on each executor sorted from least used mem to greatest
val executorAndUsedMem: Seq[(String, Long)] =
expiringMemStatus.map { case (execId, (maxMem, remainingMem)) =>
val usedMem = maxMem - remainingMem
execId -> usedMem
}.toSeq.sortBy { case (_, usedMem) => usedMem }

executorAndUsedMem
.scan(("start", freeMemOnRemaining)) {
case ((_, freeMem), (execId, usedMem)) => (execId, freeMem - usedMem)
}
.drop(1)
.filter { case (_, freeMem) => freeMem > 0 }
.map(_._1)
}
}

/**
* Given a value and a timeout in seconds, complete the future with the value when time is up.
*
* @param value The value to be returned after timeout period
* @param seconds the number of seconds to wait
* @return a future that will hold the value given after a timeout
*/
private def returnAfterTimeout[T](value: T, seconds: Long): Future[T] = {
val p = Promise[T]()
val runnable = new Runnable {
def run(): Unit = { p.success(value) }
}
scheduler.schedule(runnable, seconds, TimeUnit.SECONDS)
p.future
}

/**
* Recover cached RDD blocks off of an executor until there are no more, or until
* there is an error
*
* @param execId the id of the executor to be killed
* @return a Future of Unit that will complete once all blocks have been replicated.
*/
private def replicateUntilDone(execId: String): Future[KillReason] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return type in doc is wrong

recoverLatestBlock(execId).flatMap { moreBlocks =>
if (moreBlocks) replicateUntilDone(execId) else Future.successful(DoneRecovering)
}
}

/**
* Ask the BlockManagerMaster to replicate the latest cached rdd block off of this executor on to
* a surviving executor, and then remove the block from this executor
*
* @param execId the executor to recover a block from
* @return A future that will hold true if a block was recovered, false otherwise.
*/
private def recoverLatestBlock(execId: String): Future[Boolean] = {
blockManagerMasterEndpoint
.ask[Boolean](RecoverLatestRDDBlock(execId, recoveringExecutors.asMap.keySet.asScala.toSeq))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the reason you do this one block at a time is because you update recoveringExecutors between each call, right? To help avoid replicating to another executor which will start draining its blocks after this starts replicating the first block, but before it starts replicating later blocks?

if so, that explanation should go in a comment.

}

/**
* Remove the executor from the list of currently recovering executors and then kill it.
*
* @param execId the id of the executor to be killed
*/
private def kill(execId: String): Unit = {
executorAllocationManager.killExecutors(Seq(execId))
}

/**
* Stops all thread pools
*/
def stop(): Unit = {
threadPool.shutdownNow()
scheduler.shutdownNow()
}
}

private object CacheRecoveryManager {
def apply(eam: ExecutorAllocationManager, conf: SparkConf): CacheRecoveryManager = {
val bmme = SparkEnv.get.blockManager.master.driverEndpoint
new CacheRecoveryManager(bmme, eam, conf)
}

sealed trait KillReason
case object Timeout extends KillReason
case object DoneRecovering extends KillReason
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,10 @@ private[spark] trait ExecutorAllocationClient {
countFailures = false)
killedExecutors.nonEmpty && killedExecutors(0).equals(executorId)
}

/**
* Mark these executors as pending to be removed
* @param executorIds Executors that will be removed and should not accept new work.
*/
def markPendingToRemove(executorIds: Seq[String]): Unit
}
108 changes: 58 additions & 50 deletions core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.util.control.{ControlThrowable, NonFatal}
import com.codahale.metrics.{Gauge, MetricRegistry}

import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS}
import org.apache.spark.internal.config._
import org.apache.spark.metrics.source.Source
import org.apache.spark.scheduler._
import org.apache.spark.storage.BlockManagerMaster
Expand Down Expand Up @@ -90,6 +90,8 @@ private[spark] class ExecutorAllocationManager(

import ExecutorAllocationManager._

private var cacheRecoveryManager: CacheRecoveryManager = _

// Lower and upper bounds on the number of executors.
private val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
private val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
Expand All @@ -110,6 +112,9 @@ private[spark] class ExecutorAllocationManager(
private val cachedExecutorIdleTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.cachedExecutorIdleTimeout", s"${Integer.MAX_VALUE}s")

// whether or not to try and save cached data when executors are deallocated
private val recoverCachedData = conf.get(DYN_ALLOCATION_CACHE_RECOVERY)

// During testing, the methods to actually kill and add executors are mocked out
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)

Expand Down Expand Up @@ -212,6 +217,11 @@ private[spark] class ExecutorAllocationManager(
if (tasksPerExecutor == 0) {
throw new SparkException("spark.executor.cores must not be less than spark.task.cpus.")
}

if (recoverCachedData && cachedExecutorIdleTimeoutS == Integer.MAX_VALUE) {
throw new SparkException(s"spark.dynamicAllocation.cachedExecutorIdleTimeout must be set if" +
s"${DYN_ALLOCATION_CACHE_RECOVERY.key} is true.")
}
}

/**
Expand Down Expand Up @@ -243,12 +253,19 @@ private[spark] class ExecutorAllocationManager(
executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)

client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)

if (recoverCachedData) {
cacheRecoveryManager = CacheRecoveryManager(this, conf)
}
}

/**
* Stop the allocation manager.
*/
def stop(): Unit = {
if (cacheRecoveryManager != null) {
cacheRecoveryManager.stop()
}
executor.shutdown()
executor.awaitTermination(10, TimeUnit.SECONDS)
}
Expand Down Expand Up @@ -432,68 +449,59 @@ private[spark] class ExecutorAllocationManager(

/**
* Request the cluster manager to remove the given executors.
* Returns the list of executors which are removed.
*
* @param executors the ids of the executors to be removed
*
* @return Unit
*/
private def removeExecutors(executors: Seq[String]): Seq[String] = synchronized {
val executorIdsToBeRemoved = new ArrayBuffer[String]

private def removeExecutors(executors: Seq[String]): Unit = synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return in the doc is wrong

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're not actually removing executors here immediately with the new cache recovery path, you should update the doc to describe that too.

Also we only mention the return type if we've got something interesting to say about it, so you can just skip mentioning it entirely.

logInfo("Request to remove executorIds: " + executors.mkString(", "))
val numExistingExecutors = allocationManager.executorIds.size - executorsPendingToRemove.size

var newExecutorTotal = numExistingExecutors
executors.foreach { executorIdToBeRemoved =>
if (newExecutorTotal - 1 < minNumExecutors) {
logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " +
s"$newExecutorTotal executor(s) left (minimum number of executor limit $minNumExecutors)")
} else if (newExecutorTotal - 1 < numExecutorsTarget) {
logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " +
s"$newExecutorTotal executor(s) left (number of executor target $numExecutorsTarget)")
} else if (canBeKilled(executorIdToBeRemoved)) {
executorIdsToBeRemoved += executorIdToBeRemoved
newExecutorTotal -= 1
val numExistingExecs = allocationManager.executorIds.size - executorsPendingToRemove.size
val execCountFloor = math.max(minNumExecutors, numExecutorsTarget)
val (executorIdsToBeRemoved, dontRemove) = executors
.filter(canBeKilled)
.splitAt(numExistingExecs - execCountFloor)

if (log.isDebugEnabled()) {
dontRemove.foreach { execId =>
logDebug(s"Not removing idle executor $execId because it " +
s"would put us below the minimum limit of $minNumExecutors executors" +
s"or number of target executors $numExecutorsTarget")
}
}

if (executorIdsToBeRemoved.isEmpty) {
return Seq.empty[String]
Seq.empty[String]
} else if (testing) {
recordExecutorKill(executorIdsToBeRemoved)
} else if (recoverCachedData) {
client.markPendingToRemove(executorIdsToBeRemoved)
recordExecutorKill(executorIdsToBeRemoved)
cacheRecoveryManager.startCacheRecovery(executorIdsToBeRemoved)
} else {
val killed = killExecutors(executorIdsToBeRemoved)
recordExecutorKill(killed)
}
}

// Send a request to the backend to kill this executor(s)
val executorsRemoved = if (testing) {
executorIdsToBeRemoved
} else {
// We don't want to change our target number of executors, because we already did that
// when the task backlog decreased.
client.killExecutors(executorIdsToBeRemoved, adjustTargetNumExecutors = false,
def killExecutors(executorIds: Seq[String]): Seq[String] = {
logDebug(s"Starting kill process for ${executorIds.mkString(", ")}")
val result = client.killExecutors(executorIds, adjustTargetNumExecutors = false,
countFailures = false, force = false)
}
// [SPARK-21834] killExecutors api reduces the target number of executors.
// So we need to update the target with desired value.
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
// reset the newExecutorTotal to the existing number of executors
newExecutorTotal = numExistingExecutors
if (testing || executorsRemoved.nonEmpty) {
executorsRemoved.foreach { removedExecutorId =>
newExecutorTotal -= 1
logInfo(s"Removing executor $removedExecutorId because it has been idle for " +
s"$executorIdleTimeoutS seconds (new desired total will be $newExecutorTotal)")
executorsPendingToRemove.add(removedExecutorId)
}
executorsRemoved
} else {
if (result.isEmpty) {
logWarning(s"Unable to reach the cluster manager to kill executor/s " +
s"${executorIdsToBeRemoved.mkString(",")} or no executor eligible to kill!")
Seq.empty[String]
s"${executorIds.mkString(",")} or no executor eligible to kill!")
}
result
}

/**
* Request the cluster manager to remove the given executor.
* Return whether the request is acknowledged.
*/
private def removeExecutor(executorId: String): Boolean = synchronized {
val executorsRemoved = removeExecutors(Seq(executorId))
executorsRemoved.nonEmpty && executorsRemoved(0) == executorId
private def recordExecutorKill(executorsRemoved: Seq[String]): Unit = synchronized {
// [SPARK-21834] killExecutors api reduces the target number of executors.
// So we need to update the target with desired value.
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
executorsPendingToRemove ++= executorsRemoved
logInfo(s"Removing executors (${executorsRemoved.mkString(", ")}) because they have been idle" +
s"for $executorIdleTimeoutS seconds")
}

/**
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ package object config {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("3s")

private[spark] val DYN_ALLOCATION_CACHE_RECOVERY =
ConfigBuilder("spark.dynamicAllocation.cacheRecovery.enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your PR summary doesn't match these anymore. I'd just remove the actual name of the configs from the summary.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed reference to the configs in the PR summary and JIRA ticket.

.booleanConf
.createWithDefault(false)

private[spark] val DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT =
ConfigBuilder("spark.dynamicAllocation.cacheRecovery.timeout")
.timeConf(TimeUnit.SECONDS)
.createWithDefault(120)

private[spark] val SHUFFLE_SERVICE_ENABLED =
ConfigBuilder("spark.shuffle.service.enabled").booleanConf.createWithDefault(false)

Expand Down
Loading