Skip to content

Commit 33ae9c7

Browse files
GraceHAndrew Or
authored andcommitted
[SPARK-9552] Add force control for killExecutors to avoid false killing for those busy executors
By using the dynamic allocation, sometimes it occurs false killing for those busy executors. Some executors with assignments will be killed because of being idle for enough time (say 60 seconds). The root cause is that the Task-Launch listener event is asynchronized. For example, some executors are under assigning tasks, but not sending out the listener notification yet. Meanwhile, the dynamic allocation's executor idle time is up (e.g., 60 seconds). It will trigger killExecutor event at the same time. 1. the timer expiration starts before the listener event arrives. 2. Then, the task is going to run on top of that killed/killing executor. It will lead to task failure finally. Here is the proposal to fix it. We can add the force control for killExecutor. If the force control is not set (i.e., false), we'd better to check if the executor under killing is idle or busy. If the current executor has some assignment, we should not kill that executor and return back false (to indicate killing failure). In dynamic allocation, we'd better to turn off force killing (i.e., force = false), we will meet killing failure if tries to kill a busy executor. And then, the executor timer won't be invalid. Later on, the task assignment event arrives, we can remove the idle timer accordingly. So that we can avoid false killing for those busy executors in dynamic allocation. For the rest of usages, the end users can decide if to use force killing or not by themselves. If to turn on that option, the killExecutor will do the action without any status checking. Author: Grace <jie.huang@intel.com> Author: Andrew Or <andrew@databricks.com> Author: Jie Huang <jie.huang@intel.com> Closes #7888 from GraceH/forcekill. (cherry picked from commit 965245d) Signed-off-by: Andrew Or <andrew@databricks.com>
1 parent 78dc07c commit 33ae9c7

File tree

5 files changed

+82
-15
lines changed

5 files changed

+82
-15
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,7 @@ private[spark] class ExecutorAllocationManager(
509509
private def onExecutorBusy(executorId: String): Unit = synchronized {
510510
logDebug(s"Clearing idle timer for $executorId because it is now running a task")
511511
removeTimes.remove(executorId)
512+
executorsPendingToRemove.remove(executorId)
512513
}
513514

514515
/**

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1461,7 +1461,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
14611461
override def killExecutors(executorIds: Seq[String]): Boolean = {
14621462
schedulerBackend match {
14631463
case b: CoarseGrainedSchedulerBackend =>
1464-
b.killExecutors(executorIds)
1464+
b.killExecutors(executorIds, replace = false, force = true)
14651465
case _ =>
14661466
logWarning("Killing executors is only supported in coarse-grained mode")
14671467
false
@@ -1499,7 +1499,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
14991499
private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
15001500
schedulerBackend match {
15011501
case b: CoarseGrainedSchedulerBackend =>
1502-
b.killExecutors(Seq(executorId), replace = true)
1502+
b.killExecutors(Seq(executorId), replace = true, force = true)
15031503
case _ =>
15041504
logWarning("Killing executors is only supported in coarse-grained mode")
15051505
false

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,8 @@ private[spark] class TaskSchedulerImpl(
8787
// Incrementing task IDs
8888
val nextTaskId = new AtomicLong(0)
8989

90-
// Which executor IDs we have executors on
91-
val activeExecutorIds = new HashSet[String]
90+
// Number of tasks running on each executor
91+
private val executorIdToTaskCount = new HashMap[String, Int]
9292

9393
// The set of executors we have on each host; this is used to compute hostsAlive, which
9494
// in turn is used to decide when we can attain data locality on a given host
@@ -254,6 +254,7 @@ private[spark] class TaskSchedulerImpl(
254254
val tid = task.taskId
255255
taskIdToTaskSetManager(tid) = taskSet
256256
taskIdToExecutorId(tid) = execId
257+
executorIdToTaskCount(execId) += 1
257258
executorsByHost(host) += execId
258259
availableCpus(i) -= CPUS_PER_TASK
259260
assert(availableCpus(i) >= 0)
@@ -282,7 +283,7 @@ private[spark] class TaskSchedulerImpl(
282283
var newExecAvail = false
283284
for (o <- offers) {
284285
executorIdToHost(o.executorId) = o.host
285-
activeExecutorIds += o.executorId
286+
executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
286287
if (!executorsByHost.contains(o.host)) {
287288
executorsByHost(o.host) = new HashSet[String]()
288289
executorAdded(o.executorId, o.host)
@@ -331,7 +332,8 @@ private[spark] class TaskSchedulerImpl(
331332
if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
332333
// We lost this entire executor, so remember that it's gone
333334
val execId = taskIdToExecutorId(tid)
334-
if (activeExecutorIds.contains(execId)) {
335+
336+
if (executorIdToTaskCount.contains(execId)) {
335337
removeExecutor(execId,
336338
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
337339
failedExecutor = Some(execId)
@@ -341,7 +343,11 @@ private[spark] class TaskSchedulerImpl(
341343
case Some(taskSet) =>
342344
if (TaskState.isFinished(state)) {
343345
taskIdToTaskSetManager.remove(tid)
344-
taskIdToExecutorId.remove(tid)
346+
taskIdToExecutorId.remove(tid).foreach { execId =>
347+
if (executorIdToTaskCount.contains(execId)) {
348+
executorIdToTaskCount(execId) -= 1
349+
}
350+
}
345351
}
346352
if (state == TaskState.FINISHED) {
347353
taskSet.removeRunningTask(tid)
@@ -462,7 +468,7 @@ private[spark] class TaskSchedulerImpl(
462468
var failedExecutor: Option[String] = None
463469

464470
synchronized {
465-
if (activeExecutorIds.contains(executorId)) {
471+
if (executorIdToTaskCount.contains(executorId)) {
466472
val hostPort = executorIdToHost(executorId)
467473
logError("Lost executor %s on %s: %s".format(executorId, hostPort, reason))
468474
removeExecutor(executorId, reason)
@@ -498,7 +504,8 @@ private[spark] class TaskSchedulerImpl(
498504
* of any running tasks, since the loss reason defines whether we'll fail those tasks.
499505
*/
500506
private def removeExecutor(executorId: String, reason: ExecutorLossReason) {
501-
activeExecutorIds -= executorId
507+
executorIdToTaskCount -= executorId
508+
502509
val host = executorIdToHost(executorId)
503510
val execs = executorsByHost.getOrElse(host, new HashSet)
504511
execs -= executorId
@@ -535,7 +542,11 @@ private[spark] class TaskSchedulerImpl(
535542
}
536543

537544
def isExecutorAlive(execId: String): Boolean = synchronized {
538-
activeExecutorIds.contains(execId)
545+
executorIdToTaskCount.contains(execId)
546+
}
547+
548+
def isExecutorBusy(execId: String): Boolean = synchronized {
549+
executorIdToTaskCount.getOrElse(execId, -1) > 0
539550
}
540551

541552
// By default, rack is unknown

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -453,25 +453,32 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
453453
* @return whether the kill request is acknowledged.
454454
*/
455455
final override def killExecutors(executorIds: Seq[String]): Boolean = synchronized {
456-
killExecutors(executorIds, replace = false)
456+
killExecutors(executorIds, replace = false, force = false)
457457
}
458458

459459
/**
460460
* Request that the cluster manager kill the specified executors.
461461
*
462462
* @param executorIds identifiers of executors to kill
463463
* @param replace whether to replace the killed executors with new ones
464+
* @param force whether to force kill busy executors
464465
* @return whether the kill request is acknowledged.
465466
*/
466-
final def killExecutors(executorIds: Seq[String], replace: Boolean): Boolean = synchronized {
467+
final def killExecutors(
468+
executorIds: Seq[String],
469+
replace: Boolean,
470+
force: Boolean): Boolean = synchronized {
467471
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
468472
val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains)
469473
unknownExecutors.foreach { id =>
470474
logWarning(s"Executor to kill $id does not exist!")
471475
}
472476

473477
// If an executor is already pending to be removed, do not kill it again (SPARK-9795)
474-
val executorsToKill = knownExecutors.filter { id => !executorsPendingToRemove.contains(id) }
478+
// If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552)
479+
val executorsToKill = knownExecutors
480+
.filter { id => !executorsPendingToRemove.contains(id) }
481+
.filter { id => force || !scheduler.isExecutorBusy(id) }
475482
executorsPendingToRemove ++= executorsToKill
476483

477484
// If we do not wish to replace the executors we kill, sync the target number of executors

core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717

1818
package org.apache.spark.deploy
1919

20+
import scala.collection.mutable
2021
import scala.concurrent.duration._
2122

2223
import org.mockito.Mockito.{mock, when}
23-
import org.scalatest.BeforeAndAfterAll
24+
import org.scalatest.{BeforeAndAfterAll, PrivateMethodTester}
2425
import org.scalatest.concurrent.Eventually._
2526

2627
import org.apache.spark._
@@ -29,6 +30,7 @@ import org.apache.spark.deploy.master.ApplicationInfo
2930
import org.apache.spark.deploy.master.Master
3031
import org.apache.spark.deploy.worker.Worker
3132
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
33+
import org.apache.spark.scheduler.TaskSchedulerImpl
3234
import org.apache.spark.scheduler.cluster._
3335
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor
3436

@@ -38,7 +40,8 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterE
3840
class StandaloneDynamicAllocationSuite
3941
extends SparkFunSuite
4042
with LocalSparkContext
41-
with BeforeAndAfterAll {
43+
with BeforeAndAfterAll
44+
with PrivateMethodTester {
4245

4346
private val numWorkers = 2
4447
private val conf = new SparkConf()
@@ -404,6 +407,41 @@ class StandaloneDynamicAllocationSuite
404407
assert(apps.head.getExecutorLimit === 1)
405408
}
406409

410+
test("disable force kill for busy executors (SPARK-9552)") {
411+
sc = new SparkContext(appConf)
412+
val appId = sc.applicationId
413+
eventually(timeout(10.seconds), interval(10.millis)) {
414+
val apps = getApplications()
415+
assert(apps.size === 1)
416+
assert(apps.head.id === appId)
417+
assert(apps.head.executors.size === 2)
418+
assert(apps.head.getExecutorLimit === Int.MaxValue)
419+
}
420+
var apps = getApplications()
421+
// sync executors between the Master and the driver, needed because
422+
// the driver refuses to kill executors it does not know about
423+
syncExecutors(sc)
424+
val executors = getExecutorIds(sc)
425+
assert(executors.size === 2)
426+
427+
// simulate running a task on the executor
428+
val getMap = PrivateMethod[mutable.HashMap[String, Int]]('executorIdToTaskCount)
429+
val taskScheduler = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl]
430+
val executorIdToTaskCount = taskScheduler invokePrivate getMap()
431+
executorIdToTaskCount(executors.head) = 1
432+
// kill the busy executor without force; this should fail
433+
assert(killExecutor(sc, executors.head, force = false))
434+
apps = getApplications()
435+
assert(apps.head.executors.size === 2)
436+
437+
// force kill busy executor
438+
assert(killExecutor(sc, executors.head, force = true))
439+
apps = getApplications()
440+
// kill executor successfully
441+
assert(apps.head.executors.size === 1)
442+
443+
}
444+
407445
// ===============================
408446
// | Utility methods for testing |
409447
// ===============================
@@ -455,6 +493,16 @@ class StandaloneDynamicAllocationSuite
455493
sc.killExecutors(getExecutorIds(sc).take(n))
456494
}
457495

496+
/** Kill the given executor, specifying whether to force kill it. */
497+
private def killExecutor(sc: SparkContext, executorId: String, force: Boolean): Boolean = {
498+
syncExecutors(sc)
499+
sc.schedulerBackend match {
500+
case b: CoarseGrainedSchedulerBackend =>
501+
b.killExecutors(Seq(executorId), replace = false, force)
502+
case _ => fail("expected coarse grained scheduler")
503+
}
504+
}
505+
458506
/**
459507
* Return a list of executor IDs belonging to this application.
460508
*

0 commit comments

Comments
 (0)