Skip to content

[SPARK-9552] Add force control for killExecutors to avoid false killing for those busy executors #7888

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ private[spark] class ExecutorAllocationManager(
private def onExecutorBusy(executorId: String): Unit = synchronized {
logDebug(s"Clearing idle timer for $executorId because it is now running a task")
removeTimes.remove(executorId)
executorsPendingToRemove.remove(executorId)
}

/**
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1451,7 +1451,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
override def killExecutors(executorIds: Seq[String]): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutors(executorIds)
b.killExecutors(executorIds, replace = false, force = true)
case _ =>
logWarning("Killing executors is only supported in coarse-grained mode")
false
Expand Down Expand Up @@ -1489,7 +1489,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutors(Seq(executorId), replace = true)
b.killExecutors(Seq(executorId), replace = true, force = true)
case _ =>
logWarning("Killing executors is only supported in coarse-grained mode")
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ private[spark] class TaskSchedulerImpl(
// Incrementing task IDs
val nextTaskId = new AtomicLong(0)

// Which executor IDs we have executors on
val activeExecutorIds = new HashSet[String]
// Number of tasks running on each executor
private val executorIdToTaskCount = new HashMap[String, Int]

// The set of executors we have on each host; this is used to compute hostsAlive, which
// in turn is used to decide when we can attain data locality on a given host
Expand Down Expand Up @@ -254,6 +254,7 @@ private[spark] class TaskSchedulerImpl(
val tid = task.taskId
taskIdToTaskSetManager(tid) = taskSet
taskIdToExecutorId(tid) = execId
executorIdToTaskCount(execId) += 1
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
Expand Down Expand Up @@ -282,7 +283,7 @@ private[spark] class TaskSchedulerImpl(
var newExecAvail = false
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
activeExecutorIds += o.executorId
executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
Expand Down Expand Up @@ -331,7 +332,8 @@ private[spark] class TaskSchedulerImpl(
if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
// We lost this entire executor, so remember that it's gone
val execId = taskIdToExecutorId(tid)
if (activeExecutorIds.contains(execId)) {

if (executorIdToTaskCount.contains(execId)) {
removeExecutor(execId,
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
failedExecutor = Some(execId)
Expand All @@ -341,7 +343,11 @@ private[spark] class TaskSchedulerImpl(
case Some(taskSet) =>
if (TaskState.isFinished(state)) {
taskIdToTaskSetManager.remove(tid)
taskIdToExecutorId.remove(tid)
taskIdToExecutorId.remove(tid).foreach { execId =>
if (executorIdToTaskCount.contains(execId)) {
executorIdToTaskCount(execId) -= 1
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskIdToExecutorId.remove(tid).foreach { eid =>
  if (executorIdToTaskCount.contains(execId)) {
    executorIdToTaskCount(execId) -= 1
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change that.

}
if (state == TaskState.FINISHED) {
taskSet.removeRunningTask(tid)
Expand Down Expand Up @@ -462,7 +468,7 @@ private[spark] class TaskSchedulerImpl(
var failedExecutor: Option[String] = None

synchronized {
if (activeExecutorIds.contains(executorId)) {
if (executorIdToTaskCount.contains(executorId)) {
val hostPort = executorIdToHost(executorId)
logError("Lost executor %s on %s: %s".format(executorId, hostPort, reason))
removeExecutor(executorId, reason)
Expand All @@ -484,7 +490,8 @@ private[spark] class TaskSchedulerImpl(

/** Remove an executor from all our data structures and mark it as lost */
private def removeExecutor(executorId: String, reason: ExecutorLossReason) {
activeExecutorIds -= executorId
executorIdToTaskCount -= executorId

val host = executorIdToHost(executorId)
val execs = executorsByHost.getOrElse(host, new HashSet)
execs -= executorId
Expand Down Expand Up @@ -518,7 +525,11 @@ private[spark] class TaskSchedulerImpl(
}

def isExecutorAlive(execId: String): Boolean = synchronized {
activeExecutorIds.contains(execId)
executorIdToTaskCount.contains(execId)
}

def isExecutorBusy(execId: String): Boolean = synchronized {
executorIdToTaskCount.getOrElse(execId, -1) > 0
}

// By default, rack is unknown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,25 +411,32 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* @return whether the kill request is acknowledged.
*/
final override def killExecutors(executorIds: Seq[String]): Boolean = synchronized {
killExecutors(executorIds, replace = false)
killExecutors(executorIds, replace = false, force = false)
}

/**
* Request that the cluster manager kill the specified executors.
*
* @param executorIds identifiers of executors to kill
* @param replace whether to replace the killed executors with new ones
* @param force whether to force kill busy executors
* @return whether the kill request is acknowledged.
*/
final def killExecutors(executorIds: Seq[String], replace: Boolean): Boolean = synchronized {
final def killExecutors(
executorIds: Seq[String],
replace: Boolean,
force: Boolean): Boolean = synchronized {
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains)
unknownExecutors.foreach { id =>
logWarning(s"Executor to kill $id does not exist!")
}

// If an executor is already pending to be removed, do not kill it again (SPARK-9795)
val executorsToKill = knownExecutors.filter { id => !executorsPendingToRemove.contains(id) }
// If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552)
val executorsToKill = knownExecutors
.filter { id => !executorsPendingToRemove.contains(id) }
.filter { id => force || !scheduler.isExecutorBusy(id) }
executorsPendingToRemove ++= executorsToKill

// If we do not wish to replace the executors we kill, sync the target number of executors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.spark.deploy

import scala.collection.mutable
import scala.concurrent.duration._

import org.mockito.Mockito.{mock, when}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.{BeforeAndAfterAll, PrivateMethodTester}
import org.scalatest.concurrent.Eventually._

import org.apache.spark._
Expand All @@ -29,6 +30,7 @@ import org.apache.spark.deploy.master.ApplicationInfo
import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor

Expand All @@ -38,7 +40,8 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterE
class StandaloneDynamicAllocationSuite
extends SparkFunSuite
with LocalSparkContext
with BeforeAndAfterAll {
with BeforeAndAfterAll
with PrivateMethodTester {

private val numWorkers = 2
private val conf = new SparkConf()
Expand Down Expand Up @@ -404,6 +407,41 @@ class StandaloneDynamicAllocationSuite
assert(apps.head.getExecutorLimit === 1)
}

test("disable force kill for busy executors (SPARK-9552)") {
sc = new SparkContext(appConf)
val appId = sc.applicationId
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
assert(apps.size === 1)
assert(apps.head.id === appId)
assert(apps.head.executors.size === 2)
assert(apps.head.getExecutorLimit === Int.MaxValue)
}
var apps = getApplications()
// sync executors between the Master and the driver, needed because
// the driver refuses to kill executors it does not know about
syncExecutors(sc)
val executors = getExecutorIds(sc)
assert(executors.size === 2)

// simulate running a task on the executor
val getMap = PrivateMethod[mutable.HashMap[String, Int]]('executorIdToTaskCount)
val taskScheduler = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl]
val executorIdToTaskCount = taskScheduler invokePrivate getMap()
executorIdToTaskCount(executors.head) = 1
// kill the busy executor without force; this should fail
assert(killExecutor(sc, executors.head, force = false))
apps = getApplications()
assert(apps.head.executors.size === 2)

// force kill busy executor
assert(killExecutor(sc, executors.head, force = true))
apps = getApplications()
// kill executor successfully
assert(apps.head.executors.size === 1)

}

// ===============================
// | Utility methods for testing |
// ===============================
Expand Down Expand Up @@ -455,6 +493,16 @@ class StandaloneDynamicAllocationSuite
sc.killExecutors(getExecutorIds(sc).take(n))
}

/** Kill the given executor, specifying whether to force kill it. */
private def killExecutor(sc: SparkContext, executorId: String, force: Boolean): Boolean = {
syncExecutors(sc)
sc.schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutors(Seq(executorId), replace = false, force)
case _ => fail("expected coarse grained scheduler")
}
}

/**
* Return a list of executor IDs belonging to this application.
*
Expand Down