-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-21097][CORE] Add option to recover cached data #19041
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,201 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark | ||
|
|
||
| import java.util.concurrent.TimeUnit | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
| import scala.concurrent.{ExecutionContext, Future, Promise} | ||
| import scala.util.Failure | ||
|
|
||
| import com.google.common.cache.CacheBuilder | ||
|
|
||
| import org.apache.spark.CacheRecoveryManager.{DoneRecovering, KillReason, Timeout} | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.internal.config.DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT | ||
| import org.apache.spark.rpc.RpcEndpointRef | ||
| import org.apache.spark.storage.BlockManagerId | ||
| import org.apache.spark.storage.BlockManagerMessages._ | ||
| import org.apache.spark.util.ThreadUtils | ||
|
|
||
| /** | ||
| * Responsible for asynchronously replicating all of an executor's cached blocks, and then shutting | ||
| * it down. | ||
| */ | ||
| private class CacheRecoveryManager( | ||
| blockManagerMasterEndpoint: RpcEndpointRef, | ||
| executorAllocationManager: ExecutorAllocationManager, | ||
| conf: SparkConf) | ||
| extends Logging { | ||
|
|
||
| private val forceKillAfterS = conf.get(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT) | ||
| private val threadPool = ThreadUtils.newDaemonCachedThreadPool("cache-recovery-manager-pool") | ||
| private implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(threadPool) | ||
| private val scheduler = | ||
| ThreadUtils.newDaemonSingleThreadScheduledExecutor("cache-recovery-shutdown-timers") | ||
| private val recoveringExecutors = CacheBuilder.newBuilder() | ||
| .expireAfterWrite(forceKillAfterS * 2, TimeUnit.SECONDS) | ||
| .build[String, String]() | ||
|
|
||
| /** | ||
| * Start the recover cache shutdown process for these executors | ||
| * | ||
| * @param execIds the executors to start shutting down | ||
| * @return a sequence of futures of Unit that will complete once the executor has been killed. | ||
| */ | ||
| def startCacheRecovery(execIds: Seq[String]): Future[Seq[KillReason]] = { | ||
| logDebug(s"Recover cached data before shutting down executors ${execIds.mkString(", ")}.") | ||
| val canBeRecovered: Future[Seq[String]] = checkMem(execIds) | ||
|
||
|
|
||
| canBeRecovered.flatMap { execIds => | ||
| execIds.foreach { execId => recoveringExecutors.put(execId, execId) } | ||
| Future.sequence(execIds.map { replicateUntilTimeoutThenKill }) | ||
| } | ||
| } | ||
|
|
||
| def replicateUntilTimeoutThenKill(execId: String): Future[KillReason] = { | ||
| val timeoutFuture = returnAfterTimeout(Timeout, forceKillAfterS) | ||
| val replicationFuture = replicateUntilDone(execId) | ||
|
|
||
| Future.firstCompletedOf(List(timeoutFuture, replicationFuture)).andThen { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I dont' think this will do what you want. Try this code (if you do it in the scala repl, be sure to use import scala.concurrent._
import scala.concurrent.duration._
import java.util.concurrent.{TimeUnit, Executors, ExecutorService}
val scheduler = Executors.newSingleThreadScheduledExecutor()
val pool = Executors.newSingleThreadExecutor()
implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(pool)
def returnAfterTimeout[T](value: T, seconds: Long): Future[T] = {
val p = Promise[T]()
val runnable = new Runnable {
def run(): Unit = { println("time's up"); p.success(value) }
}
scheduler.schedule(runnable, seconds, TimeUnit.SECONDS)
p.future
}
def printStuff(x: Int): String = {
(0 until x).foreach{ i => println(i); Thread.sleep(1000)}
"done"
}
// The *value* of the future is correct here, but you'll notice the timer keeps going, we still see "time's up".
// Not so bad in this case ...
Future.firstCompletedOf(Seq(Future(printStuff(2)), returnAfterTimeout("timeout", 5)))
// The final *value* of the future is correct again here -- we get the timeout. But (a) you'll see that printStuff keeps
// running anyway and (b) the future isn't actually ready until printStuff completes, even though it should be ready
// as soon as the timer is up
Future.firstCompletedOf(Seq(Future(printStuff(10)), returnAfterTimeout("timeout", 1)))
// Slightly better, the TimeoutException is thrown as soon as the timer is up, but printStuff keeps running anyway
Await.result(Future(printStuff(10)), 1 second)So I'd change this to use ThreadUtils.awaitResult and also you need your timer to set some condition which the recovery thread is checking, so it knows to stop trying to replicate more blocks.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, so my original thought was that whatever future finished first, the replication future or the timeout future, the other would complete harmlessly in the background. I didn't realize that the future returned by firstCompletedOf wouldn't complete until both were done. I will fix this. Thanks |
||
| case scala.util.Success(DoneRecovering) => | ||
| logTrace(s"Done recovering blocks on $execId, killing now") | ||
| case scala.util.Success(Timeout) => | ||
| logWarning(s"Couldn't recover cache on $execId before $forceKillAfterS second timeout") | ||
| case Failure(ex) => | ||
| logWarning(s"Error recovering cache on $execId", ex) | ||
| }.andThen { | ||
| case _ => | ||
| kill(execId) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Given a list of executors that will be shut down, check if there is enough free memory on the | ||
| * rest of the cluster to hold their data. Return a list of just the executors for which there | ||
| * will be enough space. Executors are included smallest first. | ||
| * | ||
| * This is a best guess implementation and it is not guaranteed that all returned executors | ||
| * will succeed. For example a block might be too big to fit on any one specific executor. | ||
| * | ||
| * @param execIds executors which will be shut down | ||
| * @return a Seq of the executors we do have room for | ||
| */ | ||
| private def checkMem(execIds: Seq[String]): Future[Seq[String]] = { | ||
| val execsToShutDown = execIds.toSet | ||
| // Memory Status is a map of executor Id to a tuple of Max Memory and remaining memory on that | ||
| // executor. | ||
| val futureMemStatusByBlockManager = | ||
| blockManagerMasterEndpoint.ask[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus) | ||
|
|
||
| val futureMemStatusByExecutor = futureMemStatusByBlockManager.map { memStat => | ||
| memStat.map { case (blockManagerId, mem) => blockManagerId.executorId -> mem } | ||
| } | ||
|
|
||
| futureMemStatusByExecutor.map { memStatusByExecutor => | ||
| val (expiringMemStatus, remainingMemStatus) = memStatusByExecutor.partition { | ||
| case (execId, _) => execsToShutDown.contains(execId) | ||
| } | ||
| val freeMemOnRemaining = remainingMemStatus.values.map(_._2).sum | ||
|
|
||
| // The used mem on each executor sorted from least used mem to greatest | ||
| val executorAndUsedMem: Seq[(String, Long)] = | ||
| expiringMemStatus.map { case (execId, (maxMem, remainingMem)) => | ||
| val usedMem = maxMem - remainingMem | ||
| execId -> usedMem | ||
| }.toSeq.sortBy { case (_, usedMem) => usedMem } | ||
|
|
||
| executorAndUsedMem | ||
| .scan(("start", freeMemOnRemaining)) { | ||
| case ((_, freeMem), (execId, usedMem)) => (execId, freeMem - usedMem) | ||
| } | ||
| .drop(1) | ||
| .filter { case (_, freeMem) => freeMem > 0 } | ||
| .map(_._1) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Given a value and a timeout in seconds, complete the future with the value when time is up. | ||
| * | ||
| * @param value The value to be returned after timeout period | ||
| * @param seconds the number of seconds to wait | ||
| * @return a future that will hold the value given after a timeout | ||
| */ | ||
| private def returnAfterTimeout[T](value: T, seconds: Long): Future[T] = { | ||
| val p = Promise[T]() | ||
| val runnable = new Runnable { | ||
| def run(): Unit = { p.success(value) } | ||
| } | ||
| scheduler.schedule(runnable, seconds, TimeUnit.SECONDS) | ||
| p.future | ||
| } | ||
|
|
||
| /** | ||
| * Recover cached RDD blocks off of an executor until there are no more, or until | ||
| * there is an error | ||
| * | ||
| * @param execId the id of the executor to be killed | ||
| * @return a Future of Unit that will complete once all blocks have been replicated. | ||
| */ | ||
| private def replicateUntilDone(execId: String): Future[KillReason] = { | ||
|
||
| recoverLatestBlock(execId).flatMap { moreBlocks => | ||
| if (moreBlocks) replicateUntilDone(execId) else Future.successful(DoneRecovering) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Ask the BlockManagerMaster to replicate the latest cached rdd block off of this executor on to | ||
| * a surviving executor, and then remove the block from this executor | ||
| * | ||
| * @param execId the executor to recover a block from | ||
| * @return A future that will hold true if a block was recovered, false otherwise. | ||
| */ | ||
| private def recoverLatestBlock(execId: String): Future[Boolean] = { | ||
| blockManagerMasterEndpoint | ||
| .ask[Boolean](RecoverLatestRDDBlock(execId, recoveringExecutors.asMap.keySet.asScala.toSeq)) | ||
|
||
| } | ||
|
|
||
| /** | ||
| * Remove the executor from the list of currently recovering executors and then kill it. | ||
| * | ||
| * @param execId the id of the executor to be killed | ||
| */ | ||
| private def kill(execId: String): Unit = { | ||
| executorAllocationManager.killExecutors(Seq(execId)) | ||
| } | ||
|
|
||
| /** | ||
| * Stops all thread pools | ||
| */ | ||
| def stop(): Unit = { | ||
| threadPool.shutdownNow() | ||
| scheduler.shutdownNow() | ||
| } | ||
| } | ||
|
|
||
| private object CacheRecoveryManager { | ||
| def apply(eam: ExecutorAllocationManager, conf: SparkConf): CacheRecoveryManager = { | ||
| val bmme = SparkEnv.get.blockManager.master.driverEndpoint | ||
| new CacheRecoveryManager(bmme, eam, conf) | ||
| } | ||
|
|
||
| sealed trait KillReason | ||
| case object Timeout extends KillReason | ||
| case object DoneRecovering extends KillReason | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,7 +26,7 @@ import scala.util.control.{ControlThrowable, NonFatal} | |
| import com.codahale.metrics.{Gauge, MetricRegistry} | ||
|
|
||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS} | ||
| import org.apache.spark.internal.config._ | ||
| import org.apache.spark.metrics.source.Source | ||
| import org.apache.spark.scheduler._ | ||
| import org.apache.spark.storage.BlockManagerMaster | ||
|
|
@@ -90,6 +90,8 @@ private[spark] class ExecutorAllocationManager( | |
|
|
||
| import ExecutorAllocationManager._ | ||
|
|
||
| private var cacheRecoveryManager: CacheRecoveryManager = _ | ||
|
|
||
| // Lower and upper bounds on the number of executors. | ||
| private val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS) | ||
| private val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS) | ||
|
|
@@ -110,6 +112,9 @@ private[spark] class ExecutorAllocationManager( | |
| private val cachedExecutorIdleTimeoutS = conf.getTimeAsSeconds( | ||
| "spark.dynamicAllocation.cachedExecutorIdleTimeout", s"${Integer.MAX_VALUE}s") | ||
|
|
||
| // whether or not to try and save cached data when executors are deallocated | ||
| private val recoverCachedData = conf.get(DYN_ALLOCATION_CACHE_RECOVERY) | ||
|
|
||
| // During testing, the methods to actually kill and add executors are mocked out | ||
| private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false) | ||
|
|
||
|
|
@@ -212,6 +217,11 @@ private[spark] class ExecutorAllocationManager( | |
| if (tasksPerExecutor == 0) { | ||
| throw new SparkException("spark.executor.cores must not be less than spark.task.cpus.") | ||
| } | ||
|
|
||
| if (recoverCachedData && cachedExecutorIdleTimeoutS == Integer.MAX_VALUE) { | ||
| throw new SparkException(s"spark.dynamicAllocation.cachedExecutorIdleTimeout must be set if" + | ||
| s"${DYN_ALLOCATION_CACHE_RECOVERY.key} is true.") | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -243,12 +253,19 @@ private[spark] class ExecutorAllocationManager( | |
| executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS) | ||
|
|
||
| client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) | ||
|
|
||
| if (recoverCachedData) { | ||
| cacheRecoveryManager = CacheRecoveryManager(this, conf) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Stop the allocation manager. | ||
| */ | ||
| def stop(): Unit = { | ||
| if (cacheRecoveryManager != null) { | ||
| cacheRecoveryManager.stop() | ||
| } | ||
| executor.shutdown() | ||
| executor.awaitTermination(10, TimeUnit.SECONDS) | ||
| } | ||
|
|
@@ -432,68 +449,59 @@ private[spark] class ExecutorAllocationManager( | |
|
|
||
| /** | ||
| * Request the cluster manager to remove the given executors. | ||
| * Returns the list of executors which are removed. | ||
| * | ||
| * @param executors the ids of the executors to be removed | ||
| * | ||
| * @return Unit | ||
| */ | ||
| private def removeExecutors(executors: Seq[String]): Seq[String] = synchronized { | ||
| val executorIdsToBeRemoved = new ArrayBuffer[String] | ||
|
|
||
| private def removeExecutors(executors: Seq[String]): Unit = synchronized { | ||
|
||
| logInfo("Request to remove executorIds: " + executors.mkString(", ")) | ||
| val numExistingExecutors = allocationManager.executorIds.size - executorsPendingToRemove.size | ||
|
|
||
| var newExecutorTotal = numExistingExecutors | ||
| executors.foreach { executorIdToBeRemoved => | ||
| if (newExecutorTotal - 1 < minNumExecutors) { | ||
| logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " + | ||
| s"$newExecutorTotal executor(s) left (minimum number of executor limit $minNumExecutors)") | ||
| } else if (newExecutorTotal - 1 < numExecutorsTarget) { | ||
| logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " + | ||
| s"$newExecutorTotal executor(s) left (number of executor target $numExecutorsTarget)") | ||
| } else if (canBeKilled(executorIdToBeRemoved)) { | ||
| executorIdsToBeRemoved += executorIdToBeRemoved | ||
| newExecutorTotal -= 1 | ||
| val numExistingExecs = allocationManager.executorIds.size - executorsPendingToRemove.size | ||
| val execCountFloor = math.max(minNumExecutors, numExecutorsTarget) | ||
| val (executorIdsToBeRemoved, dontRemove) = executors | ||
| .filter(canBeKilled) | ||
| .splitAt(numExistingExecs - execCountFloor) | ||
|
|
||
| if (log.isDebugEnabled()) { | ||
| dontRemove.foreach { execId => | ||
| logDebug(s"Not removing idle executor $execId because it " + | ||
| s"would put us below the minimum limit of $minNumExecutors executors" + | ||
| s"or number of target executors $numExecutorsTarget") | ||
| } | ||
| } | ||
|
|
||
| if (executorIdsToBeRemoved.isEmpty) { | ||
| return Seq.empty[String] | ||
| Seq.empty[String] | ||
| } else if (testing) { | ||
| recordExecutorKill(executorIdsToBeRemoved) | ||
| } else if (recoverCachedData) { | ||
| client.markPendingToRemove(executorIdsToBeRemoved) | ||
| recordExecutorKill(executorIdsToBeRemoved) | ||
| cacheRecoveryManager.startCacheRecovery(executorIdsToBeRemoved) | ||
| } else { | ||
| val killed = killExecutors(executorIdsToBeRemoved) | ||
| recordExecutorKill(killed) | ||
| } | ||
| } | ||
|
|
||
| // Send a request to the backend to kill this executor(s) | ||
| val executorsRemoved = if (testing) { | ||
| executorIdsToBeRemoved | ||
| } else { | ||
| // We don't want to change our target number of executors, because we already did that | ||
| // when the task backlog decreased. | ||
| client.killExecutors(executorIdsToBeRemoved, adjustTargetNumExecutors = false, | ||
| def killExecutors(executorIds: Seq[String]): Seq[String] = { | ||
| logDebug(s"Starting kill process for ${executorIds.mkString(", ")}") | ||
| val result = client.killExecutors(executorIds, adjustTargetNumExecutors = false, | ||
| countFailures = false, force = false) | ||
| } | ||
| // [SPARK-21834] killExecutors api reduces the target number of executors. | ||
| // So we need to update the target with desired value. | ||
| client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) | ||
| // reset the newExecutorTotal to the existing number of executors | ||
| newExecutorTotal = numExistingExecutors | ||
| if (testing || executorsRemoved.nonEmpty) { | ||
| executorsRemoved.foreach { removedExecutorId => | ||
| newExecutorTotal -= 1 | ||
| logInfo(s"Removing executor $removedExecutorId because it has been idle for " + | ||
| s"$executorIdleTimeoutS seconds (new desired total will be $newExecutorTotal)") | ||
| executorsPendingToRemove.add(removedExecutorId) | ||
| } | ||
| executorsRemoved | ||
| } else { | ||
| if (result.isEmpty) { | ||
| logWarning(s"Unable to reach the cluster manager to kill executor/s " + | ||
| s"${executorIdsToBeRemoved.mkString(",")} or no executor eligible to kill!") | ||
| Seq.empty[String] | ||
| s"${executorIds.mkString(",")} or no executor eligible to kill!") | ||
| } | ||
| result | ||
| } | ||
|
|
||
| /** | ||
| * Request the cluster manager to remove the given executor. | ||
| * Return whether the request is acknowledged. | ||
| */ | ||
| private def removeExecutor(executorId: String): Boolean = synchronized { | ||
| val executorsRemoved = removeExecutors(Seq(executorId)) | ||
| executorsRemoved.nonEmpty && executorsRemoved(0) == executorId | ||
| private def recordExecutorKill(executorsRemoved: Seq[String]): Unit = synchronized { | ||
| // [SPARK-21834] killExecutors api reduces the target number of executors. | ||
| // So we need to update the target with desired value. | ||
| client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) | ||
| executorsPendingToRemove ++= executorsRemoved | ||
| logInfo(s"Removing executors (${executorsRemoved.mkString(", ")}) because they have been idle" + | ||
| s"for $executorIdleTimeoutS seconds") | ||
| } | ||
|
|
||
| /** | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -130,6 +130,16 @@ package object config { | |
| .timeConf(TimeUnit.MILLISECONDS) | ||
| .createWithDefaultString("3s") | ||
|
|
||
| private[spark] val DYN_ALLOCATION_CACHE_RECOVERY = | ||
| ConfigBuilder("spark.dynamicAllocation.cacheRecovery.enabled") | ||
|
||
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| private[spark] val DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT = | ||
| ConfigBuilder("spark.dynamicAllocation.cacheRecovery.timeout") | ||
| .timeConf(TimeUnit.SECONDS) | ||
| .createWithDefault(120) | ||
|
|
||
| private[spark] val SHUFFLE_SERVICE_ENABLED = | ||
| ConfigBuilder("spark.shuffle.service.enabled").booleanConf.createWithDefault(false) | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find
recoveringExecutorspretty confusing, I think its executors that are recovering from some problem, but are going to be OK -- not executors that are about to die, which we are recovering data from. how aboutdrainingExecutors? (though I have a feeling this name may have been discussed in earlier rounds of comments and this is what we settled on ... if so, thats fine.)