Skip to content

[SPARK-18553][CORE][branch-2.0] Fix leak of TaskSetManager following executor loss #15986

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

JoshRosen
Copy link
Contributor

@JoshRosen JoshRosen commented Nov 23, 2016

What changes were proposed in this pull request?

This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails.

This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several ShuffleDependency instances that were retained by TaskSetManagers inside the scheduler but were not otherwise referenced. Each of these TaskSetManagers was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by the TaskSchedulerImpl.taskIdToTaskSetManager map.

Entries are added to the taskIdToTaskSetManager map when tasks are launched and are removed inside of TaskScheduler.statusUpdate(), which is invoked by the scheduler backend while processing StatusUpdate messages from executors. The problem with this design is that a completely dead executor will never send a StatusUpdate. There is some code in statusUpdate which handles tasks that exit with the TaskState.LOST state (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. The executorLost and removeExecutor methods don't appear to perform any cleanup of the taskId -> * mappings, causing the leaks observed here.

This patch's fix is to maintain a executorId -> running task id mapping so that these taskId -> * maps can be properly cleaned up following an executor loss.

There are some potential corner-case interactions that I'm concerned about here, especially some details in the comment in removeExecutor, so I'd appreciate a very careful review of these changes.

This PR is opened against branch-2.0, where I first observed this problem, but will also need to be fixed in master, branch-2.1, and branch-1.6 (which I'll do in followup PRs after this fix is reviewed and merged).

How was this patch tested?

I added a new unit test to TaskSchedulerImplSuite. You can check out this PR as of 25e455e to see the failing test.

cc @kayousterhout, @markhamstra, @rxin for review.

@@ -89,9 +89,11 @@ private[spark] class TaskSchedulerImpl(
val nextTaskId = new AtomicLong(0)

// Number of tasks running on each executor
private val executorIdToTaskCount = new HashMap[String, Int]
private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To pre-emptively address any concerns about the memory usage implications of this change, note that the hash set sizes should be bounded by the number of cores / task slots on the executor, so I don't think that there's much to be gained by using a more memory-efficient HashMap structure here. The only real optimization that I could think of would be to replace the map with a fixed-size array that we just linearly scan, but that seems like premature optimization and adds a lot of hard-to-reason-about complexity.


def runningTasksByExecutors(): Map[String, Int] = executorIdToTaskCount.toMap
def runningTasksByExecutors(): Map[String, Int] = synchronized {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The synchronized here fixes a thread-safety bug added in #11888

@@ -525,7 +525,12 @@ private[spark] class TaskSchedulerImpl(
* of any running tasks, since the loss reason defines whether we'll fail those tasks.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused about this comment since this seems to suggest that we'll do some sort of per-task cleanup at some later time. If anyone knows which cleanup this is referring to then maybe we should consider not updating executorIdToRunningTaskIds at all in here and instead maybe should be performing the taskIdToExecutorId and taskIdToTaskSetManager updates somewhere else.

On the other hand, the only place where we currently remove entries from taskIdToExecutorId and taskIdToTaskSetManager are in statusUpdate, so my hunch is that the eventual cleanup alluded to here isn't happening in standalone mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the "tasks are not re-scheduled while executor loss reason is pending" test in TaskSchedulerImplSuite, it looks like the API contract here is that if executorLost is called with LossReasonPending then it will eventually be called with some other reason. This will cause it to call rootPool.executorLost() , which, in turn, will call executorLost for all TaskSetManagers, which will perform their own internal executorId to task id mapping to mark tasks as failed and inform the DAGScheduler. The TaskSetManager doesn't call back into the TaskScheduler to access any of the data in these mappings so I think it's safe to clean them up immediately at the top of removeExecutor rather than putting them behind the reason != LossReasonPending check.

Note that it's also not as simple as just putting those behind reason != LossReasonPending as a defensive measure because then we'd be changing the contract on when runningTasksByExecutors() is updated: previously, it would set a failed executor's running task count to zero as soon as the executor failed, whereas it would do it only after the reason was known should we move this update behind that check.

I think that these subtleties / distinctions are only relevant to YARN mode, so I'll loop in @vanzin to comment on them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been a while, so my memory of how all this works is a little hazy. But yes, this only applies in YARN mode. Other backends do not use LossReasonPending.

From a quick look at the code, removeExecutor maintains the host-to-executor-id mapping so that it can differentiate between an executor that has already been removed and one whose exit reason is unknown. This avoids calling Pool.executorLost a second time in the first case. Not sure how much that distinction matters though, maybe it's ok to call it again and this code can be simplified.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Josh, my understanding is consistent with yours -- that if executorLost() is called with LossReasonPending, it will eventually be called again with a real loss reason. To elaborate on what Marcelo said, executorIdToHost is used at the "gatekeeper" for this: on this line, we use the existence of an entry in executorIdToHost to determine whether we need to call removeExecutor (possibly for a 2nd time).

My only concern with the current approach is the following: the task doesn't get marked as finished in the task set manager until rootPool.executorLost gets called (i.e., until the 2nd removeExecutor call). Is it possible for anything to go awry for task updates that happen between the first removeExecutor call (at which point the TaskSchedulerImpl's state for the task will be removed) and when the task is marked as failed and removed from the TSM? I looked over this and I think things should be fine -- we'll just drop the update -- but can you update the error message we log on 369 (since now it can happen when an update happens after an executor has been lost, in addition to when duplicate updates are received)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for anything to go awry for task updates that happen between the first removeExecutor call ... and when the task is marked as failed

If there's something really wrong with the YARN AM, it can take some time, but eventually the second call will come, either with the actual reason or with a generic "executor lost" reason. This is handled in YarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver.

@@ -339,7 +341,7 @@ private[spark] class TaskSchedulerImpl(
// We lost this entire executor, so remember that it's gone
val execId = taskIdToExecutorId(tid)

if (executorIdToTaskCount.contains(execId)) {
if (executorIdToRunningTaskIds.contains(execId)) {
reason = Some(
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
removeExecutor(execId, reason.get)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following the call to removeExecutor here, the taskIdToTaskSetManager.get(tid) call immediately following this if will be guaranteed to return None. This is going to slightly change the behavior of how TaskState.LOST events are handled, which, given my earlier comments, I believe is only relevant to Mesos fine-grained mode (I think this is underscored by the error message s"Task $tid was lost, so marking the executor as lost as well.", since this only makes sense if you assume one task per executor).

Given this, I'm planning to refactor this block slightly so that the body if the if corresponds to the old code and so that the code following the if is only run for non TaskState.LOST end reasons.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some logging in this TaskState.LOST branch and it looks like this case isn't hit at all in our existing scheduler tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More archaeology:

It looks like TaskState.LOST was introduced in Spark 0.6.0 as part of a refactoring to make the cluster scheduler pluggable: e72afdb. That commit is from July 2012.

At the time, standalone mode didn't even exist and the schedulers were Mesos and local mode, and only Mesos fine-grained mode was supported. The only way to get a TaskState.LOST state was to convert the Mesos task loss state to it.

Copy link
Contributor Author

@JoshRosen JoshRosen Nov 23, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To look at this another way, note that TaskSchedulerImpl is only called from three places:

  • LocalSchedulerBackend, which won't use TaskState.LOST
  • MesosFineGrainedSchedulerBackend, where TaskState.LOST means the total loss of an executor that corresponded to a single task (due to fine-grained mode)
  • CoarseGrainedSchedulerBackend, where this is only called with a state that comes from a StatusUpdate message sent by an executor. This task state will never be TaskState.LOST.

Given all of this, I think that the right course of action here is to update the comments to clarify that TaskState.LOST is only relevant to fine-grained Mesos mode and to refactor this block to call

taskSet.removeRunningTask(tid)
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)

after calling removeExecutor for the Mesos fine-grained task, then skipping the rest of the logic which only applies to local mode or coarse-grained schedulers.

@@ -89,9 +89,11 @@ private[spark] class TaskSchedulerImpl(
val nextTaskId = new AtomicLong(0)

// Number of tasks running on each executor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update this comment

taskScheduler.executorLost("executor0", SlaveLost())

// Check that state associated with the lost task attempt is cleaned up:
assert(taskScheduler.taskIdToExecutorId.isEmpty)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose that we should also strengthen the assertions in the existing tests to check that these maps are updated following task successes, but this may be tricky given that the existing tests aren't exercising the statusUpdate path. Rather, we may have to test this more end-to-end by asserting that these always become empty once all jobs and tasks are done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah is it prohibitively difficult to add a 2nd test for when the executorLost happens as a result of a status update call (e.g., have two tasks running on a particular executor, fail one of them via statusUpdate, and make sure the other one's state gets cleaned up?)? The scheduler code is such a tangled web that it would be nice to test both code paths.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that case can occur because TaskState.LOST should only be sent in Mesos fine-grained mode where we'll only ever have a single task per executor (in other words, executorLoss can only happen via statusUpdate in situations where there would never be concurrent tasks on an executor).

@SparkQA
Copy link

SparkQA commented Nov 23, 2016

Test build #69046 has finished for PR 15986 at commit 69feae3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Nov 23, 2016

@JoshRosen
Copy link
Contributor Author

I managed to come up with a standalone end-to-end reproduction of the shuffle file leak, allowing me to validate this patch's fix.

Run

./bin/spark-shell --master=local-cluster[2,5,1024] --conf spark.task.maxFailures=1 --conf spark.local.dir=/tmp

to bring up a Spark shell with two executor JVMs. Then, execute the following:

sc.parallelize(1 to 10, 10).map { x => Thread.sleep(1000); (x, x) }.groupByKey(10).map { 
    case _ => Thread.sleep(120 * 1000); 1
}.union(sc.parallelize(1 to 10)).count()

(Note that the .union() here is critical for the reproduction; I explain this below).

The Thread.sleep() calls were strategically chosen so that we'll get the executor JVMs into a state where both executors have run shuffle map tasks and both are in the middle of running reduce / result tasks.

Next, kill one of the executor JVMs abruptly with kill -9. The worker JVM will immediately detect its executor JVM's death and will send messages to the master causing that executor's tasks to be marked as failed. Because of spark.task.maxFailures=1 this will cause the job to immediately fail but there will still be five running zombie tasks on the executor that we didn't kill.

Wait until those zombie tasks have finished (which will happen within two minutes), then run System.gc(), then check the non-killed executor's block manager directories and observe that shuffle files have been leaked. This is due to the leak of the ShuffleDependency, which can be validated with jmap -histo:

$ jmap -histo 72081 | grep ShuffleDependency
2037:             1             56  org.apache.spark.ShuffleDependency

This is because the TaskSetManager was leaked:

 jmap -histo 72081 | grep 'org.apache.spark.scheduler.TaskSetManager$'
1252:             1            224  org.apache.spark.scheduler.TaskSetManager

Note that while executor death seems to always leak a TaskSetManager, this doesn't always result in a leaked ShuffleDependency; the reasons for this are slightly subtle and I can expand on them later, but to summarize in a nutshell: a Task whose partition is a ShuffleRddPartition won't actually contain a reference to the parent RDD; the parent RDD and ShuffleDependency will be kept alive in the scheduler via the parent stage and via inter-stage relationships, but there won't be a direct reference chain from the Task itself. On the other hand, some partition types such as UnionRDDPartition may have transient references to parent RDD objects, causing the driver-side Task to keep the whole RDD and ShuffleDependency lineage chain alive. This usually isn't a problem since Tasks typically don't get leaked like this and the @transient fields prevent us from over-capturing during serialization, but it exacerbates the TaskSetManager leaks here.

After applying this PR's changes, you can re-run the same experiment and see that both the TaskSetManager and ShuffleDependency are properly cleaned up after the zombie tasks finish and GC has run to trigger the ContextCleaner.

@SparkQA
Copy link

SparkQA commented Nov 23, 2016

Test build #69053 has finished for PR 15986 at commit e99cc8f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

if (executorIdToTaskCount.contains(execId)) {
executorIdToTaskCount(execId) -= 1
}
executorIdToRunningTaskIds.remove(execId)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spotted a bug in this fix: this should actually be executorIdToTaskCount(execId).remove(tid).

@@ -335,31 +337,31 @@ private[spark] class TaskSchedulerImpl(
var reason: Option[ExecutorLossReason] = None
synchronized {
try {
if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To understand this change, note that taskIdToExecutorId and taskIdToTaskSetManager should always have the same key set, so the case Some(taskSet) if state == TaskState.LOST is equivalent to this if statement.

case Some(taskSet) if state == TaskState.LOST =>
// TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode,
// where each executor corresponds to a single task, so mark the executor as failed.
val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would always be safe to just do a blind get() here, but I added the getOrElse and exception to explain why this is safe.

}
}
taskIdToTaskSetManager.get(tid) match {
taskSet.removeRunningTask(tid)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, these lines would be executed for the TaskState.LOST case by continuing onwards after the if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) block finished. The problem with doing that here is that my changes in removeExecutor() will have already removed this task from taskIdToTaskSetManager so you'd get an incorrect Ignoring update with state ... error. By moving the logic from that original if case into this case Some(...) if ... case it's easy to keep the shared logic for the "unknown task id" case while avoiding hitting that case spuriously.

}
}
}
if (state == TaskState.FINISHED) {
taskSet.removeRunningTask(tid)
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, TaskState.LOST will have already been handled by the new case above, so I removed it here because this case will never be hit.

@SparkQA
Copy link

SparkQA commented Nov 23, 2016

Test build #69060 has finished for PR 15986 at commit e26b7b7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 23, 2016

Test build #69062 has finished for PR 15986 at commit cd04c1a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode,
// where each executor corresponds to a single task, so mark the executor as failed.
val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException(
"taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"))
reason = Some(
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
removeExecutor(execId, reason.get)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handling of var failedExecutor and var reason, while not wholly new, appears odd. They are only set within the TaskState.LOST case, and only used within that case or in the post-try block protected by failedExecutor.isDefined. Seems like that post-try logic can be moved into the TaskState.LOST case and the handling of failedExecutor and reason simplified.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I originally held off on doing this because I wanted to minimize code movement, but at this phase I think it's worth doing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One bit of trickiness is the comment after the try block:

    // Update the DAGScheduler without holding a lock on this, since that can deadlock
    if (failedExecutor.isDefined) {
...

I assume this comment is referring to the synchronized surrounding the try, so I don't think that we'll be able to simplify this much while still preserving this contract. Therefore, ugly as this is, I'd prefer to keep this weird structure for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, good point.

override def executorAdded(execId: String, host: String) {}
}

val e0Offers = Seq(new WorkerOffer("executor0", "host0", 1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove new


def runningTasksByExecutors(): Map[String, Int] = executorIdToTaskCount.toMap
def runningTasksByExecutors(): Map[String, Int] = synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why this shouldn't be def runningTasksByExecutors: Map[String, Int] -- it's not a mutator.

Copy link
Contributor Author

@JoshRosen JoshRosen Nov 23, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No good reason; just a carryover from the old code. I also didn't want to have to update the existing call sites.

@markhamstra
Copy link
Contributor

The code looks pretty good @JoshRosen , but I still want to spend some time looking at your standalone end-to-end reproduction to get more familiar with the details.

Copy link
Contributor

@kayousterhout kayousterhout left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you do the "synchronized" change in a separate PR? Worried about it getting lost here (/ being harder to track), and it seems easy to do separately after this is merged since it's just one line.

Otherwise, I looked over this carefully and it looks good. Left a few suggestions to improve readability and try to avoid more issues in the future.


def runningTasksByExecutors(): Map[String, Int] = executorIdToTaskCount.toMap
def runningTasksByExecutors(): Map[String, Int] = synchronized {
executorIdToRunningTaskIds.toMap.mapValues(_.size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need the toMap here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this to convert from a mutable to immutable map.

Copy link
Contributor

@kayousterhout kayousterhout Nov 23, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry should have been more specific: why does the map need to be immutable? Just maintaining the old API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I think it's also needed for thread-safety. If we return a mutable map to the caller and they then go on to iterate over it then we're in trouble if that map isn't thread-safe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it doesn't matter because we're creating a new map with mapValues, right?

Anyway this seems not especially important to this PR so fine to leave as-is.


if (executorIdToTaskCount.contains(execId)) {
taskIdToTaskSetManager.get(tid) match {
case Some(taskSet) if state == TaskState.LOST =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you just do "case Some(taskSet) =>" here, and then make a new block for the "if state == TaskState.LOST" and later the "else"? I find that easier to read than having this case statement and then the later "case Some(taskSet)" (since it's more obvious that we're doing some stuff, if the task set exists, and throwing an error otherwise)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, can do. I held off on doing this originally because it would involve a lot more code movement and might have made the changes harder to review.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah got it -- I appreciate that! But I do think that makes sense to go ahead and do.

// We lost this entire executor, so remember that it's gone
val execId = taskIdToExecutorId(tid)

if (executorIdToTaskCount.contains(execId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So just to check, this like was redundant before right, which is why it could be removed? It seems like as long as "taskIdToExecutorId.contains(tid)" evaluates to true in the old code, this should have evaluated to true also.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we might want to keep this in case the Mesos scheduler sends us duplicate task failure events so that we don't try to remove an already-removed executor. I'll put this back in with an explanatory comment in my next commit.

@@ -525,7 +527,14 @@ private[spark] class TaskSchedulerImpl(
* of any running tasks, since the loss reason defines whether we'll fail those tasks.
*/
private def removeExecutor(executorId: String, reason: ExecutorLossReason) {
executorIdToTaskCount -= executorId
executorIdToRunningTaskIds.remove(executorId).foreach { taskIds =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment here -- something like

The tasks on the lost executor may not send any more status updates (because the executor has been lost), so they should be cleaned up here.

s"${taskIds.mkString("[", ",", "]")} on failed executor $executorId")
taskIds.foreach { tid =>
taskIdToExecutorId.remove(tid)
taskIdToTaskSetManager.remove(tid)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about moving the two above lines to a "cleanupTaskState" method (or something like that) that can be called here and in statusUpdate? I wonder if that will help us avoid state tracking issues in the future -- because it's clear that the same cleanup should be happening in both places.

Can you also add a comment here saying that we only need to cleanup state because telling the TSM that the task has failed happens via the rootPool.executorLost call?

@@ -525,7 +525,12 @@ private[spark] class TaskSchedulerImpl(
* of any running tasks, since the loss reason defines whether we'll fail those tasks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Josh, my understanding is consistent with yours -- that if executorLost() is called with LossReasonPending, it will eventually be called again with a real loss reason. To elaborate on what Marcelo said, executorIdToHost is used at the "gatekeeper" for this: on this line, we use the existence of an entry in executorIdToHost to determine whether we need to call removeExecutor (possibly for a 2nd time).

My only concern with the current approach is the following: the task doesn't get marked as finished in the task set manager until rootPool.executorLost gets called (i.e., until the 2nd removeExecutor call). Is it possible for anything to go awry for task updates that happen between the first removeExecutor call (at which point the TaskSchedulerImpl's state for the task will be removed) and when the task is marked as failed and removed from the TSM? I looked over this and I think things should be fine -- we'll just drop the update -- but can you update the error message we log on 369 (since now it can happen when an update happens after an executor has been lost, in addition to when duplicate updates are received)?

taskScheduler.executorLost("executor0", SlaveLost())

// Check that state associated with the lost task attempt is cleaned up:
assert(taskScheduler.taskIdToExecutorId.isEmpty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah is it prohibitively difficult to add a 2nd test for when the executorLost happens as a result of a status update call (e.g., have two tasks running on a particular executor, fail one of them via statusUpdate, and make sure the other one's state gets cleaned up?)? The scheduler code is such a tangled web that it would be nice to test both code paths.

@JoshRosen
Copy link
Contributor Author

Thanks for the review pass. I'll split the thread-safety fix into a separate PR and will work on addressing your comments now.

I'll see about adding new unit tests to cover the statusUpdate case.

@markhamstra
Copy link
Contributor

If Kay is happy with the last couple of changes, then I'm fine with this, too. The only tiny nit I've still got is a change from runningTasksByExecutors() to runningTasksByExecutors. Outside of this PR, there's only a single call site in SparkStatusTracker, so the fix is pretty trivial -- but so is this issue itself, so I don't really care much if it stays as is.

Copy link
Contributor

@kayousterhout kayousterhout left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@kayousterhout
Copy link
Contributor

Thanks for all of the work on this Josh! Happy to review the version for master if the merge isn't clean.

@JoshRosen
Copy link
Contributor Author

@markhamstra, I'll fix that in a followup patch to address the pre-existing synchronization / thread-safety issue impacting that method.

@markhamstra
Copy link
Contributor

Thanks, @JoshRosen

@JoshRosen
Copy link
Contributor Author

Alright, I'm going to merge this now and will open backport PRs against the other branches. Thanks for reviewing!

asfgit pushed a commit that referenced this pull request Nov 28, 2016
…executor loss

## What changes were proposed in this pull request?

This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails.

This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several `ShuffleDependency` instances that were retained by `TaskSetManager`s inside the scheduler but were not otherwise referenced. Each of these `TaskSetManager`s was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map.

Entries are added to the `taskIdToTaskSetManager` map when tasks are launched and are removed inside of `TaskScheduler.statusUpdate()`, which is invoked by the scheduler backend while processing `StatusUpdate` messages from executors. The problem with this design is that a completely dead executor will never send a `StatusUpdate`. There is [some code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338) in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` state (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. The `executorLost` and [`removeExecutor`](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527) methods don't appear to perform any cleanup of the `taskId -> *` mappings, causing the leaks observed here.

This patch's fix is to maintain a `executorId -> running task id` mapping so that these `taskId -> *` maps can be properly cleaned up following an executor loss.

There are some potential corner-case interactions that I'm concerned about here, especially some details in [the comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523) in `removeExecutor`, so I'd appreciate a very careful review of these changes.

This PR is opened against branch-2.0, where I first observed this problem, but will also need to be fixed in master, branch-2.1, and branch-1.6 (which I'll do in followup PRs after this fix is reviewed and merged).

## How was this patch tested?

I added a new unit test to `TaskSchedulerImplSuite`. You can check out this PR as of 25e455e to see the failing test.

cc kayousterhout, markhamstra, rxin for review.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #15986 from JoshRosen/fix-leak-following-total-executor-loss.
@JoshRosen JoshRosen closed this Nov 28, 2016
@JoshRosen JoshRosen deleted the fix-leak-following-total-executor-loss branch November 28, 2016 21:20
JoshRosen added a commit to JoshRosen/spark that referenced this pull request Nov 28, 2016
ghost pushed a commit to dbtsai/spark that referenced this pull request Nov 30, 2016
## What changes were proposed in this pull request?

_This is the master branch version of apache#15986; the original description follows:_

This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails.

This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several `ShuffleDependency` instances that were retained by `TaskSetManager`s inside the scheduler but were not otherwise referenced. Each of these `TaskSetManager`s was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map.

Entries are added to the `taskIdToTaskSetManager` map when tasks are launched and are removed inside of `TaskScheduler.statusUpdate()`, which is invoked by the scheduler backend while processing `StatusUpdate` messages from executors. The problem with this design is that a completely dead executor will never send a `StatusUpdate`. There is [some code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338) in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` state (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. The `executorLost` and [`removeExecutor`](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527) methods don't appear to perform any cleanup of the `taskId -> *` mappings, causing the leaks observed here.

This patch's fix is to maintain a `executorId -> running task id` mapping so that these `taskId -> *` maps can be properly cleaned up following an executor loss.

There are some potential corner-case interactions that I'm concerned about here, especially some details in [the comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523) in `removeExecutor`, so I'd appreciate a very careful review of these changes.

## How was this patch tested?

I added a new unit test to `TaskSchedulerImplSuite`.

/cc kayousterhout and markhamstra, who reviewed apache#15986.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#16045 from JoshRosen/fix-leak-following-total-executor-loss-master.
asfgit pushed a commit that referenced this pull request Nov 30, 2016
_This is the master branch version of #15986; the original description follows:_

This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails.

This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several `ShuffleDependency` instances that were retained by `TaskSetManager`s inside the scheduler but were not otherwise referenced. Each of these `TaskSetManager`s was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map.

Entries are added to the `taskIdToTaskSetManager` map when tasks are launched and are removed inside of `TaskScheduler.statusUpdate()`, which is invoked by the scheduler backend while processing `StatusUpdate` messages from executors. The problem with this design is that a completely dead executor will never send a `StatusUpdate`. There is [some code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338) in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` state (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. The `executorLost` and [`removeExecutor`](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527) methods don't appear to perform any cleanup of the `taskId -> *` mappings, causing the leaks observed here.

This patch's fix is to maintain a `executorId -> running task id` mapping so that these `taskId -> *` maps can be properly cleaned up following an executor loss.

There are some potential corner-case interactions that I'm concerned about here, especially some details in [the comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523) in `removeExecutor`, so I'd appreciate a very careful review of these changes.

I added a new unit test to `TaskSchedulerImplSuite`.

/cc kayousterhout and markhamstra, who reviewed #15986.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #16045 from JoshRosen/fix-leak-following-total-executor-loss-master.

(cherry picked from commit 9a02f68)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
JoshRosen added a commit to JoshRosen/spark that referenced this pull request Nov 30, 2016
…executor loss

This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails.

This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several `ShuffleDependency` instances that were retained by `TaskSetManager`s inside the scheduler but were not otherwise referenced. Each of these `TaskSetManager`s was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map.

Entries are added to the `taskIdToTaskSetManager` map when tasks are launched and are removed inside of `TaskScheduler.statusUpdate()`, which is invoked by the scheduler backend while processing `StatusUpdate` messages from executors. The problem with this design is that a completely dead executor will never send a `StatusUpdate`. There is [some code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338) in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` state (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. The `executorLost` and [`removeExecutor`](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527) methods don't appear to perform any cleanup of the `taskId -> *` mappings, causing the leaks observed here.

This patch's fix is to maintain a `executorId -> running task id` mapping so that these `taskId -> *` maps can be properly cleaned up following an executor loss.

There are some potential corner-case interactions that I'm concerned about here, especially some details in [the comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523) in `removeExecutor`, so I'd appreciate a very careful review of these changes.

This PR is opened against branch-2.0, where I first observed this problem, but will also need to be fixed in master, branch-2.1, and branch-1.6 (which I'll do in followup PRs after this fix is reviewed and merged).

I added a new unit test to `TaskSchedulerImplSuite`. You can check out this PR as of 25e455e to see the failing test.

cc kayousterhout, markhamstra, rxin for review.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#15986 from JoshRosen/fix-leak-following-total-executor-loss.
asfgit pushed a commit that referenced this pull request Nov 30, 2016
…utors

## What changes were proposed in this pull request?

The method `TaskSchedulerImpl.runningTasksByExecutors()` accesses the mutable `executorIdToRunningTaskIds` map without proper synchronization. In addition, as markhamstra pointed out in #15986, the signature's use of parentheses is a little odd given that this is a pure getter method.

This patch fixes both issues.

## How was this patch tested?

Covered by existing tests.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #16073 from JoshRosen/runningTasksByExecutors-thread-safety.

(cherry picked from commit c51c772)
Signed-off-by: Andrew Or <andrewor14@gmail.com>
asfgit pushed a commit that referenced this pull request Nov 30, 2016
…utors

## What changes were proposed in this pull request?

The method `TaskSchedulerImpl.runningTasksByExecutors()` accesses the mutable `executorIdToRunningTaskIds` map without proper synchronization. In addition, as markhamstra pointed out in #15986, the signature's use of parentheses is a little odd given that this is a pure getter method.

This patch fixes both issues.

## How was this patch tested?

Covered by existing tests.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #16073 from JoshRosen/runningTasksByExecutors-thread-safety.
asfgit pushed a commit that referenced this pull request Nov 30, 2016
…utors

## What changes were proposed in this pull request?

The method `TaskSchedulerImpl.runningTasksByExecutors()` accesses the mutable `executorIdToRunningTaskIds` map without proper synchronization. In addition, as markhamstra pointed out in #15986, the signature's use of parentheses is a little odd given that this is a pure getter method.

This patch fixes both issues.

## How was this patch tested?

Covered by existing tests.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #16073 from JoshRosen/runningTasksByExecutors-thread-safety.

(cherry picked from commit c51c772)
Signed-off-by: Andrew Or <andrewor14@gmail.com>
asfgit pushed a commit that referenced this pull request Dec 1, 2016
…executor loss

## What changes were proposed in this pull request?

_This is the master branch-1.6 version of #15986; the original description follows:_

This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails.

This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several `ShuffleDependency` instances that were retained by `TaskSetManager`s inside the scheduler but were not otherwise referenced. Each of these `TaskSetManager`s was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map.

Entries are added to the `taskIdToTaskSetManager` map when tasks are launched and are removed inside of `TaskScheduler.statusUpdate()`, which is invoked by the scheduler backend while processing `StatusUpdate` messages from executors. The problem with this design is that a completely dead executor will never send a `StatusUpdate`. There is [some code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338) in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` state (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. The `executorLost` and [`removeExecutor`](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527) methods don't appear to perform any cleanup of the `taskId -> *` mappings, causing the leaks observed here.

This patch's fix is to maintain a `executorId -> running task id` mapping so that these `taskId -> *` maps can be properly cleaned up following an executor loss.

There are some potential corner-case interactions that I'm concerned about here, especially some details in [the comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523) in `removeExecutor`, so I'd appreciate a very careful review of these changes.

## How was this patch tested?

I added a new unit test to `TaskSchedulerImplSuite`.

/cc kayousterhout and markhamstra, who reviewed #15986.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #16070 from JoshRosen/fix-leak-following-total-executor-loss-1.6.
zzcclp pushed a commit to zzcclp/spark that referenced this pull request Dec 2, 2016
…executor loss

## What changes were proposed in this pull request?

_This is the master branch-1.6 version of apache#15986; the original description follows:_

This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails.

This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several `ShuffleDependency` instances that were retained by `TaskSetManager`s inside the scheduler but were not otherwise referenced. Each of these `TaskSetManager`s was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map.

Entries are added to the `taskIdToTaskSetManager` map when tasks are launched and are removed inside of `TaskScheduler.statusUpdate()`, which is invoked by the scheduler backend while processing `StatusUpdate` messages from executors. The problem with this design is that a completely dead executor will never send a `StatusUpdate`. There is [some code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338) in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` state (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. The `executorLost` and [`removeExecutor`](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527) methods don't appear to perform any cleanup of the `taskId -> *` mappings, causing the leaks observed here.

This patch's fix is to maintain a `executorId -> running task id` mapping so that these `taskId -> *` maps can be properly cleaned up following an executor loss.

There are some potential corner-case interactions that I'm concerned about here, especially some details in [the comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523) in `removeExecutor`, so I'd appreciate a very careful review of these changes.

## How was this patch tested?

I added a new unit test to `TaskSchedulerImplSuite`.

/cc kayousterhout and markhamstra, who reviewed apache#15986.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#16070 from JoshRosen/fix-leak-following-total-executor-loss-1.6.
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 2, 2016
## What changes were proposed in this pull request?

_This is the master branch version of apache#15986; the original description follows:_

This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails.

This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several `ShuffleDependency` instances that were retained by `TaskSetManager`s inside the scheduler but were not otherwise referenced. Each of these `TaskSetManager`s was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map.

Entries are added to the `taskIdToTaskSetManager` map when tasks are launched and are removed inside of `TaskScheduler.statusUpdate()`, which is invoked by the scheduler backend while processing `StatusUpdate` messages from executors. The problem with this design is that a completely dead executor will never send a `StatusUpdate`. There is [some code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338) in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` state (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. The `executorLost` and [`removeExecutor`](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527) methods don't appear to perform any cleanup of the `taskId -> *` mappings, causing the leaks observed here.

This patch's fix is to maintain a `executorId -> running task id` mapping so that these `taskId -> *` maps can be properly cleaned up following an executor loss.

There are some potential corner-case interactions that I'm concerned about here, especially some details in [the comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523) in `removeExecutor`, so I'd appreciate a very careful review of these changes.

## How was this patch tested?

I added a new unit test to `TaskSchedulerImplSuite`.

/cc kayousterhout and markhamstra, who reviewed apache#15986.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#16045 from JoshRosen/fix-leak-following-total-executor-loss-master.
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 2, 2016
…utors

## What changes were proposed in this pull request?

The method `TaskSchedulerImpl.runningTasksByExecutors()` accesses the mutable `executorIdToRunningTaskIds` map without proper synchronization. In addition, as markhamstra pointed out in apache#15986, the signature's use of parentheses is a little odd given that this is a pure getter method.

This patch fixes both issues.

## How was this patch tested?

Covered by existing tests.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#16073 from JoshRosen/runningTasksByExecutors-thread-safety.
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 15, 2016
## What changes were proposed in this pull request?

_This is the master branch version of apache#15986; the original description follows:_

This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails.

This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several `ShuffleDependency` instances that were retained by `TaskSetManager`s inside the scheduler but were not otherwise referenced. Each of these `TaskSetManager`s was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map.

Entries are added to the `taskIdToTaskSetManager` map when tasks are launched and are removed inside of `TaskScheduler.statusUpdate()`, which is invoked by the scheduler backend while processing `StatusUpdate` messages from executors. The problem with this design is that a completely dead executor will never send a `StatusUpdate`. There is [some code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338) in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` state (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. The `executorLost` and [`removeExecutor`](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527) methods don't appear to perform any cleanup of the `taskId -> *` mappings, causing the leaks observed here.

This patch's fix is to maintain a `executorId -> running task id` mapping so that these `taskId -> *` maps can be properly cleaned up following an executor loss.

There are some potential corner-case interactions that I'm concerned about here, especially some details in [the comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523) in `removeExecutor`, so I'd appreciate a very careful review of these changes.

## How was this patch tested?

I added a new unit test to `TaskSchedulerImplSuite`.

/cc kayousterhout and markhamstra, who reviewed apache#15986.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#16045 from JoshRosen/fix-leak-following-total-executor-loss-master.
dosoft pushed a commit to WANdisco/spark that referenced this pull request Jan 24, 2017
…executor loss

## What changes were proposed in this pull request?

_This is the master branch-1.6 version of apache#15986; the original description follows:_

This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails.

This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several `ShuffleDependency` instances that were retained by `TaskSetManager`s inside the scheduler but were not otherwise referenced. Each of these `TaskSetManager`s was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map.

Entries are added to the `taskIdToTaskSetManager` map when tasks are launched and are removed inside of `TaskScheduler.statusUpdate()`, which is invoked by the scheduler backend while processing `StatusUpdate` messages from executors. The problem with this design is that a completely dead executor will never send a `StatusUpdate`. There is [some code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338) in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` state (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. The `executorLost` and [`removeExecutor`](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527) methods don't appear to perform any cleanup of the `taskId -> *` mappings, causing the leaks observed here.

This patch's fix is to maintain a `executorId -> running task id` mapping so that these `taskId -> *` maps can be properly cleaned up following an executor loss.

There are some potential corner-case interactions that I'm concerned about here, especially some details in [the comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523) in `removeExecutor`, so I'd appreciate a very careful review of these changes.

## How was this patch tested?

I added a new unit test to `TaskSchedulerImplSuite`.

/cc kayousterhout and markhamstra, who reviewed apache#15986.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#16070 from JoshRosen/fix-leak-following-total-executor-loss-1.6.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?

_This is the master branch version of apache#15986; the original description follows:_

This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails.

This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several `ShuffleDependency` instances that were retained by `TaskSetManager`s inside the scheduler but were not otherwise referenced. Each of these `TaskSetManager`s was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map.

Entries are added to the `taskIdToTaskSetManager` map when tasks are launched and are removed inside of `TaskScheduler.statusUpdate()`, which is invoked by the scheduler backend while processing `StatusUpdate` messages from executors. The problem with this design is that a completely dead executor will never send a `StatusUpdate`. There is [some code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338) in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` state (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. The `executorLost` and [`removeExecutor`](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527) methods don't appear to perform any cleanup of the `taskId -> *` mappings, causing the leaks observed here.

This patch's fix is to maintain a `executorId -> running task id` mapping so that these `taskId -> *` maps can be properly cleaned up following an executor loss.

There are some potential corner-case interactions that I'm concerned about here, especially some details in [the comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523) in `removeExecutor`, so I'd appreciate a very careful review of these changes.

## How was this patch tested?

I added a new unit test to `TaskSchedulerImplSuite`.

/cc kayousterhout and markhamstra, who reviewed apache#15986.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#16045 from JoshRosen/fix-leak-following-total-executor-loss-master.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…utors

## What changes were proposed in this pull request?

The method `TaskSchedulerImpl.runningTasksByExecutors()` accesses the mutable `executorIdToRunningTaskIds` map without proper synchronization. In addition, as markhamstra pointed out in apache#15986, the signature's use of parentheses is a little odd given that this is a pure getter method.

This patch fixes both issues.

## How was this patch tested?

Covered by existing tests.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#16073 from JoshRosen/runningTasksByExecutors-thread-safety.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants