Skip to content

[SPARK-18553][CORE][BRANCH-1.6] Fix leak of TaskSetManager following executor loss #16070

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ private[spark] class TaskSchedulerImpl(
// Incrementing task IDs
val nextTaskId = new AtomicLong(0)

// Number of tasks running on each executor
private val executorIdToTaskCount = new HashMap[String, Int]
// IDs of the tasks running on each executor
private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]]

// The set of executors we have on each host; this is used to compute hostsAlive, which
// in turn is used to decide when we can attain data locality on a given host
Expand Down Expand Up @@ -254,7 +254,7 @@ private[spark] class TaskSchedulerImpl(
val tid = task.taskId
taskIdToTaskSetManager(tid) = taskSet
taskIdToExecutorId(tid) = execId
executorIdToTaskCount(execId) += 1
executorIdToRunningTaskIds(execId).add(tid)
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
Expand Down Expand Up @@ -283,7 +283,7 @@ private[spark] class TaskSchedulerImpl(
var newExecAvail = false
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
executorIdToRunningTaskIds.getOrElseUpdate(o.executorId, HashSet[Long]())
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
Expand Down Expand Up @@ -329,37 +329,34 @@ private[spark] class TaskSchedulerImpl(
var failedExecutor: Option[String] = None
synchronized {
try {
if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
// We lost this entire executor, so remember that it's gone
val execId = taskIdToExecutorId(tid)

if (executorIdToTaskCount.contains(execId)) {
removeExecutor(execId,
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
failedExecutor = Some(execId)
}
}
taskIdToTaskSetManager.get(tid) match {
case Some(taskSet) =>
if (TaskState.isFinished(state)) {
taskIdToTaskSetManager.remove(tid)
taskIdToExecutorId.remove(tid).foreach { execId =>
if (executorIdToTaskCount.contains(execId)) {
executorIdToTaskCount(execId) -= 1
}
if (state == TaskState.LOST) {
// TaskState.LOST is only used by the Mesos fine-grained scheduling mode,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch on the comment here!

// where each executor corresponds to a single task, so mark the executor as failed.
val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException(
"taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"))
if (executorIdToRunningTaskIds.contains(execId)) {
val reason =
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")
removeExecutor(execId, reason)
failedExecutor = Some(execId)
}
}
if (state == TaskState.FINISHED) {
taskSet.removeRunningTask(tid)
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
if (TaskState.isFinished(state)) {
cleanupTaskState(tid)
taskSet.removeRunningTask(tid)
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
if (state == TaskState.FINISHED) {
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
}
case None =>
logError(
("Ignoring update with state %s for TID %s because its task set is gone (this is " +
"likely the result of receiving duplicate task finished status updates)")
"likely the result of receiving duplicate task finished status updates) or its " +
"executor has been marked as failed.")
.format(state, tid))
}
} catch {
Expand Down Expand Up @@ -468,7 +465,7 @@ private[spark] class TaskSchedulerImpl(
var failedExecutor: Option[String] = None

synchronized {
if (executorIdToTaskCount.contains(executorId)) {
if (executorIdToRunningTaskIds.contains(executorId)) {
val hostPort = executorIdToHost(executorId)
logExecutorLoss(executorId, hostPort, reason)
removeExecutor(executorId, reason)
Expand Down Expand Up @@ -510,13 +507,31 @@ private[spark] class TaskSchedulerImpl(
logError(s"Lost executor $executorId on $hostPort: $reason")
}

/**
* Cleans up the TaskScheduler's state for tracking the given task.
*/
private def cleanupTaskState(tid: Long): Unit = {
taskIdToTaskSetManager.remove(tid)
taskIdToExecutorId.remove(tid).foreach { executorId =>
executorIdToRunningTaskIds.get(executorId).foreach { _.remove(tid) }
}
}

/**
* Remove an executor from all our data structures and mark it as lost. If the executor's loss
* reason is not yet known, do not yet remove its association with its host nor update the status
* of any running tasks, since the loss reason defines whether we'll fail those tasks.
*/
private def removeExecutor(executorId: String, reason: ExecutorLossReason) {
executorIdToTaskCount -= executorId
// The tasks on the lost executor may not send any more status updates (because the executor
// has been lost), so they should be cleaned up here.
executorIdToRunningTaskIds.remove(executorId).foreach { taskIds =>
logDebug("Cleaning up TaskScheduler state for tasks " +
s"${taskIds.mkString("[", ",", "]")} on failed executor $executorId")
// We do not notify the TaskSetManager of the task failures because that will
// happen below in the rootPool.executorLost() call.
taskIds.foreach(cleanupTaskState)
}

val host = executorIdToHost(executorId)
val execs = executorsByHost.getOrElse(host, new HashSet)
Expand Down Expand Up @@ -554,11 +569,11 @@ private[spark] class TaskSchedulerImpl(
}

def isExecutorAlive(execId: String): Boolean = synchronized {
executorIdToTaskCount.contains(execId)
executorIdToRunningTaskIds.contains(execId)
}

def isExecutorBusy(execId: String): Boolean = synchronized {
executorIdToTaskCount.getOrElse(execId, -1) > 0
executorIdToRunningTaskIds.get(execId).exists(_.nonEmpty)
}

// By default, rack is unknown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,10 +425,11 @@ class StandaloneDynamicAllocationSuite
assert(executors.size === 2)

// simulate running a task on the executor
val getMap = PrivateMethod[mutable.HashMap[String, Int]]('executorIdToTaskCount)
val getMap =
PrivateMethod[mutable.HashMap[String, mutable.HashSet[Long]]]('executorIdToRunningTaskIds)
val taskScheduler = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl]
val executorIdToTaskCount = taskScheduler invokePrivate getMap()
executorIdToTaskCount(executors.head) = 1
val executorIdToRunningTaskIds = taskScheduler invokePrivate getMap()
executorIdToRunningTaskIds(executors.head) = mutable.HashSet(1L)
// kill the busy executor without force; this should fail
assert(killExecutor(sc, executors.head, force = false))
apps = getApplications()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.scheduler

import java.nio.ByteBuffer

import org.apache.spark._

class FakeSchedulerBackend extends SchedulerBackend {
Expand Down Expand Up @@ -273,4 +275,68 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
assert("executor1" === taskDescriptions3(0).executorId)
}

test("if an executor is lost then the state for its running tasks is cleaned up (SPARK-18553)") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskScheduler = new TaskSchedulerImpl(sc)
taskScheduler.initialize(new FakeSchedulerBackend)
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
new DAGScheduler(sc, taskScheduler) {
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
override def executorAdded(execId: String, host: String) {}
}

val e0Offers = Seq(WorkerOffer("executor0", "host0", 1))
val attempt1 = FakeTask.createTaskSet(1)

// submit attempt 1, offer resources, task gets scheduled
taskScheduler.submitTasks(attempt1)
val taskDescriptions = taskScheduler.resourceOffers(e0Offers).flatten
assert(1 === taskDescriptions.length)

// mark executor0 as dead
taskScheduler.executorLost("executor0", SlaveLost())
assert(!taskScheduler.isExecutorAlive("executor0"))
assert(!taskScheduler.hasExecutorsAliveOnHost("host0"))
assert(taskScheduler.getExecutorsAliveOnHost("host0").isEmpty)


// Check that state associated with the lost task attempt is cleaned up:
assert(taskScheduler.taskIdToExecutorId.isEmpty)
assert(taskScheduler.taskIdToTaskSetManager.isEmpty)
}

test("if a task finishes with TaskState.LOST its executor is marked as dead") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskScheduler = new TaskSchedulerImpl(sc)
taskScheduler.initialize(new FakeSchedulerBackend)
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
new DAGScheduler(sc, taskScheduler) {
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
override def executorAdded(execId: String, host: String) {}
}

val e0Offers = Seq(WorkerOffer("executor0", "host0", 1))
val attempt1 = FakeTask.createTaskSet(1)

// submit attempt 1, offer resources, task gets scheduled
taskScheduler.submitTasks(attempt1)
val taskDescriptions = taskScheduler.resourceOffers(e0Offers).flatten
assert(1 === taskDescriptions.length)

// Report the task as failed with TaskState.LOST
taskScheduler.statusUpdate(
tid = taskDescriptions.head.taskId,
state = TaskState.LOST,
serializedData = ByteBuffer.allocate(0)
)

// Check that state associated with the lost task attempt is cleaned up:
assert(taskScheduler.taskIdToExecutorId.isEmpty)
assert(taskScheduler.taskIdToTaskSetManager.isEmpty)

// Check that the executor has been marked as dead
assert(!taskScheduler.isExecutorAlive("executor0"))
assert(!taskScheduler.hasExecutorsAliveOnHost("host0"))
assert(taskScheduler.getExecutorsAliveOnHost("host0").isEmpty)
}
}