-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-18553][CORE][branch-2.0] Fix leak of TaskSetManager following executor loss #15986
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18553][CORE][branch-2.0] Fix leak of TaskSetManager following executor loss #15986
Conversation
@@ -89,9 +89,11 @@ private[spark] class TaskSchedulerImpl( | |||
val nextTaskId = new AtomicLong(0) | |||
|
|||
// Number of tasks running on each executor | |||
private val executorIdToTaskCount = new HashMap[String, Int] | |||
private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To pre-emptively address any concerns about the memory usage implications of this change, note that the hash set sizes should be bounded by the number of cores / task slots on the executor, so I don't think that there's much to be gained by using a more memory-efficient HashMap structure here. The only real optimization that I could think of would be to replace the map with a fixed-size array that we just linearly scan, but that seems like premature optimization and adds a lot of hard-to-reason-about complexity.
|
||
def runningTasksByExecutors(): Map[String, Int] = executorIdToTaskCount.toMap | ||
def runningTasksByExecutors(): Map[String, Int] = synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The synchronized
here fixes a thread-safety bug added in #11888
@@ -525,7 +525,12 @@ private[spark] class TaskSchedulerImpl( | |||
* of any running tasks, since the loss reason defines whether we'll fail those tasks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused about this comment since this seems to suggest that we'll do some sort of per-task cleanup at some later time. If anyone knows which cleanup this is referring to then maybe we should consider not updating executorIdToRunningTaskIds
at all in here and instead maybe should be performing the taskIdToExecutorId
and taskIdToTaskSetManager
updates somewhere else.
On the other hand, the only place where we currently remove entries from taskIdToExecutorId
and taskIdToTaskSetManager
are in statusUpdate
, so my hunch is that the eventual cleanup alluded to here isn't happening in standalone mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the "tasks are not re-scheduled while executor loss reason is pending" test in TaskSchedulerImplSuite
, it looks like the API contract here is that if executorLost
is called with LossReasonPending
then it will eventually be called with some other reason. This will cause it to call rootPool.executorLost()
, which, in turn, will call executorLost
for all TaskSetManagers, which will perform their own internal executorId to task id mapping to mark tasks as failed and inform the DAGScheduler. The TaskSetManager
doesn't call back into the TaskScheduler
to access any of the data in these mappings so I think it's safe to clean them up immediately at the top of removeExecutor
rather than putting them behind the reason != LossReasonPending
check.
Note that it's also not as simple as just putting those behind reason != LossReasonPending
as a defensive measure because then we'd be changing the contract on when runningTasksByExecutors()
is updated: previously, it would set a failed executor's running task count to zero as soon as the executor failed, whereas it would do it only after the reason was known should we move this update behind that check.
I think that these subtleties / distinctions are only relevant to YARN mode, so I'll loop in @vanzin to comment on them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been a while, so my memory of how all this works is a little hazy. But yes, this only applies in YARN mode. Other backends do not use LossReasonPending
.
From a quick look at the code, removeExecutor
maintains the host-to-executor-id mapping so that it can differentiate between an executor that has already been removed and one whose exit reason is unknown. This avoids calling Pool.executorLost
a second time in the first case. Not sure how much that distinction matters though, maybe it's ok to call it again and this code can be simplified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Josh, my understanding is consistent with yours -- that if executorLost() is called with LossReasonPending, it will eventually be called again with a real loss reason. To elaborate on what Marcelo said, executorIdToHost is used at the "gatekeeper" for this: on this line, we use the existence of an entry in executorIdToHost to determine whether we need to call removeExecutor (possibly for a 2nd time).
My only concern with the current approach is the following: the task doesn't get marked as finished in the task set manager until rootPool.executorLost gets called (i.e., until the 2nd removeExecutor call). Is it possible for anything to go awry for task updates that happen between the first removeExecutor call (at which point the TaskSchedulerImpl's state for the task will be removed) and when the task is marked as failed and removed from the TSM? I looked over this and I think things should be fine -- we'll just drop the update -- but can you update the error message we log on 369 (since now it can happen when an update happens after an executor has been lost, in addition to when duplicate updates are received)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible for anything to go awry for task updates that happen between the first removeExecutor call ... and when the task is marked as failed
If there's something really wrong with the YARN AM, it can take some time, but eventually the second call will come, either with the actual reason or with a generic "executor lost" reason. This is handled in YarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver
.
@@ -339,7 +341,7 @@ private[spark] class TaskSchedulerImpl( | |||
// We lost this entire executor, so remember that it's gone | |||
val execId = taskIdToExecutorId(tid) | |||
|
|||
if (executorIdToTaskCount.contains(execId)) { | |||
if (executorIdToRunningTaskIds.contains(execId)) { | |||
reason = Some( | |||
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) | |||
removeExecutor(execId, reason.get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following the call to removeExecutor
here, the taskIdToTaskSetManager.get(tid)
call immediately following this if
will be guaranteed to return None
. This is going to slightly change the behavior of how TaskState.LOST
events are handled, which, given my earlier comments, I believe is only relevant to Mesos fine-grained mode (I think this is underscored by the error message s"Task $tid was lost, so marking the executor as lost as well."
, since this only makes sense if you assume one task per executor).
Given this, I'm planning to refactor this block slightly so that the body if the if
corresponds to the old code and so that the code following the if
is only run for non TaskState.LOST
end reasons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some logging in this TaskState.LOST
branch and it looks like this case isn't hit at all in our existing scheduler tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @mgummelt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More archaeology:
It looks like TaskState.LOST
was introduced in Spark 0.6.0 as part of a refactoring to make the cluster scheduler pluggable: e72afdb. That commit is from July 2012.
At the time, standalone mode didn't even exist and the schedulers were Mesos and local mode, and only Mesos fine-grained mode was supported. The only way to get a TaskState.LOST
state was to convert the Mesos task loss state to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To look at this another way, note that TaskSchedulerImpl
is only called from three places:
- LocalSchedulerBackend, which won't use
TaskState.LOST
- MesosFineGrainedSchedulerBackend, where
TaskState.LOST
means the total loss of an executor that corresponded to a single task (due to fine-grained mode) - CoarseGrainedSchedulerBackend, where this is only called with a state that comes from a
StatusUpdate
message sent by an executor. This task state will never beTaskState.LOST
.
Given all of this, I think that the right course of action here is to update the comments to clarify that TaskState.LOST
is only relevant to fine-grained Mesos mode and to refactor this block to call
taskSet.removeRunningTask(tid)
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
after calling removeExecutor
for the Mesos fine-grained task, then skipping the rest of the logic which only applies to local mode or coarse-grained schedulers.
@@ -89,9 +89,11 @@ private[spark] class TaskSchedulerImpl( | |||
val nextTaskId = new AtomicLong(0) | |||
|
|||
// Number of tasks running on each executor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update this comment
taskScheduler.executorLost("executor0", SlaveLost()) | ||
|
||
// Check that state associated with the lost task attempt is cleaned up: | ||
assert(taskScheduler.taskIdToExecutorId.isEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose that we should also strengthen the assertions in the existing tests to check that these maps are updated following task successes, but this may be tricky given that the existing tests aren't exercising the statusUpdate
path. Rather, we may have to test this more end-to-end by asserting that these always become empty once all jobs and tasks are done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah is it prohibitively difficult to add a 2nd test for when the executorLost happens as a result of a status update call (e.g., have two tasks running on a particular executor, fail one of them via statusUpdate, and make sure the other one's state gets cleaned up?)? The scheduler code is such a tangled web that it would be nice to test both code paths.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that case can occur because TaskState.LOST
should only be sent in Mesos fine-grained mode where we'll only ever have a single task per executor (in other words, executorLoss can only happen via statusUpdate in situations where there would never be concurrent tasks on an executor).
Test build #69046 has finished for PR 15986 at commit
|
I managed to come up with a standalone end-to-end reproduction of the shuffle file leak, allowing me to validate this patch's fix. Run
to bring up a Spark shell with two executor JVMs. Then, execute the following: sc.parallelize(1 to 10, 10).map { x => Thread.sleep(1000); (x, x) }.groupByKey(10).map {
case _ => Thread.sleep(120 * 1000); 1
}.union(sc.parallelize(1 to 10)).count() (Note that the The Next, kill one of the executor JVMs abruptly with Wait until those zombie tasks have finished (which will happen within two minutes), then run $ jmap -histo 72081 | grep ShuffleDependency
2037: 1 56 org.apache.spark.ShuffleDependency This is because the
Note that while executor death seems to always leak a After applying this PR's changes, you can re-run the same experiment and see that both the |
Test build #69053 has finished for PR 15986 at commit
|
if (executorIdToTaskCount.contains(execId)) { | ||
executorIdToTaskCount(execId) -= 1 | ||
} | ||
executorIdToRunningTaskIds.remove(execId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spotted a bug in this fix: this should actually be executorIdToTaskCount(execId).remove(tid)
.
@@ -335,31 +337,31 @@ private[spark] class TaskSchedulerImpl( | |||
var reason: Option[ExecutorLossReason] = None | |||
synchronized { | |||
try { | |||
if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To understand this change, note that taskIdToExecutorId
and taskIdToTaskSetManager
should always have the same key set, so the case Some(taskSet) if state == TaskState.LOST
is equivalent to this if
statement.
case Some(taskSet) if state == TaskState.LOST => | ||
// TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode, | ||
// where each executor corresponds to a single task, so mark the executor as failed. | ||
val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would always be safe to just do a blind get()
here, but I added the getOrElse
and exception to explain why this is safe.
} | ||
} | ||
taskIdToTaskSetManager.get(tid) match { | ||
taskSet.removeRunningTask(tid) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, these lines would be executed for the TaskState.LOST
case by continuing onwards after the if (state == TaskState.LOST && taskIdToExecutorId.contains(tid))
block finished. The problem with doing that here is that my changes in removeExecutor()
will have already removed this task from taskIdToTaskSetManager
so you'd get an incorrect Ignoring update with state ...
error. By moving the logic from that original if
case into this case Some(...) if ...
case it's easy to keep the shared logic for the "unknown task id" case while avoiding hitting that case spuriously.
} | ||
} | ||
} | ||
if (state == TaskState.FINISHED) { | ||
taskSet.removeRunningTask(tid) | ||
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) | ||
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, TaskState.LOST
will have already been handled by the new case above, so I removed it here because this case will never be hit.
Test build #69060 has finished for PR 15986 at commit
|
Test build #69062 has finished for PR 15986 at commit
|
// TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode, | ||
// where each executor corresponds to a single task, so mark the executor as failed. | ||
val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException( | ||
"taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)")) | ||
reason = Some( | ||
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) | ||
removeExecutor(execId, reason.get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The handling of var failedExecutor
and var reason
, while not wholly new, appears odd. They are only set within the TaskState.LOST
case, and only used within that case or in the post-try block protected by failedExecutor.isDefined
. Seems like that post-try logic can be moved into the TaskState.LOST
case and the handling of failedExecutor
and reason
simplified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. I originally held off on doing this because I wanted to minimize code movement, but at this phase I think it's worth doing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One bit of trickiness is the comment after the try
block:
// Update the DAGScheduler without holding a lock on this, since that can deadlock
if (failedExecutor.isDefined) {
...
I assume this comment is referring to the synchronized
surrounding the try
, so I don't think that we'll be able to simplify this much while still preserving this contract. Therefore, ugly as this is, I'd prefer to keep this weird structure for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, good point.
override def executorAdded(execId: String, host: String) {} | ||
} | ||
|
||
val e0Offers = Seq(new WorkerOffer("executor0", "host0", 1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove new
|
||
def runningTasksByExecutors(): Map[String, Int] = executorIdToTaskCount.toMap | ||
def runningTasksByExecutors(): Map[String, Int] = synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why this shouldn't be def runningTasksByExecutors: Map[String, Int]
-- it's not a mutator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No good reason; just a carryover from the old code. I also didn't want to have to update the existing call sites.
The code looks pretty good @JoshRosen , but I still want to spend some time looking at your standalone end-to-end reproduction to get more familiar with the details. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you do the "synchronized" change in a separate PR? Worried about it getting lost here (/ being harder to track), and it seems easy to do separately after this is merged since it's just one line.
Otherwise, I looked over this carefully and it looks good. Left a few suggestions to improve readability and try to avoid more issues in the future.
|
||
def runningTasksByExecutors(): Map[String, Int] = executorIdToTaskCount.toMap | ||
def runningTasksByExecutors(): Map[String, Int] = synchronized { | ||
executorIdToRunningTaskIds.toMap.mapValues(_.size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need the toMap here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need this to convert from a mutable to immutable map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry should have been more specific: why does the map need to be immutable? Just maintaining the old API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I think it's also needed for thread-safety. If we return a mutable map to the caller and they then go on to iterate over it then we're in trouble if that map isn't thread-safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it doesn't matter because we're creating a new map with mapValues, right?
Anyway this seems not especially important to this PR so fine to leave as-is.
|
||
if (executorIdToTaskCount.contains(execId)) { | ||
taskIdToTaskSetManager.get(tid) match { | ||
case Some(taskSet) if state == TaskState.LOST => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you just do "case Some(taskSet) =>" here, and then make a new block for the "if state == TaskState.LOST" and later the "else"? I find that easier to read than having this case statement and then the later "case Some(taskSet)" (since it's more obvious that we're doing some stuff, if the task set exists, and throwing an error otherwise)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, can do. I held off on doing this originally because it would involve a lot more code movement and might have made the changes harder to review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah got it -- I appreciate that! But I do think that makes sense to go ahead and do.
// We lost this entire executor, so remember that it's gone | ||
val execId = taskIdToExecutorId(tid) | ||
|
||
if (executorIdToTaskCount.contains(execId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So just to check, this like was redundant before right, which is why it could be removed? It seems like as long as "taskIdToExecutorId.contains(tid)" evaluates to true in the old code, this should have evaluated to true also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we might want to keep this in case the Mesos scheduler sends us duplicate task failure events so that we don't try to remove an already-removed executor. I'll put this back in with an explanatory comment in my next commit.
@@ -525,7 +527,14 @@ private[spark] class TaskSchedulerImpl( | |||
* of any running tasks, since the loss reason defines whether we'll fail those tasks. | |||
*/ | |||
private def removeExecutor(executorId: String, reason: ExecutorLossReason) { | |||
executorIdToTaskCount -= executorId | |||
executorIdToRunningTaskIds.remove(executorId).foreach { taskIds => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment here -- something like
The tasks on the lost executor may not send any more status updates (because the executor has been lost), so they should be cleaned up here.
s"${taskIds.mkString("[", ",", "]")} on failed executor $executorId") | ||
taskIds.foreach { tid => | ||
taskIdToExecutorId.remove(tid) | ||
taskIdToTaskSetManager.remove(tid) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about moving the two above lines to a "cleanupTaskState" method (or something like that) that can be called here and in statusUpdate? I wonder if that will help us avoid state tracking issues in the future -- because it's clear that the same cleanup should be happening in both places.
Can you also add a comment here saying that we only need to cleanup state because telling the TSM that the task has failed happens via the rootPool.executorLost call?
@@ -525,7 +525,12 @@ private[spark] class TaskSchedulerImpl( | |||
* of any running tasks, since the loss reason defines whether we'll fail those tasks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Josh, my understanding is consistent with yours -- that if executorLost() is called with LossReasonPending, it will eventually be called again with a real loss reason. To elaborate on what Marcelo said, executorIdToHost is used at the "gatekeeper" for this: on this line, we use the existence of an entry in executorIdToHost to determine whether we need to call removeExecutor (possibly for a 2nd time).
My only concern with the current approach is the following: the task doesn't get marked as finished in the task set manager until rootPool.executorLost gets called (i.e., until the 2nd removeExecutor call). Is it possible for anything to go awry for task updates that happen between the first removeExecutor call (at which point the TaskSchedulerImpl's state for the task will be removed) and when the task is marked as failed and removed from the TSM? I looked over this and I think things should be fine -- we'll just drop the update -- but can you update the error message we log on 369 (since now it can happen when an update happens after an executor has been lost, in addition to when duplicate updates are received)?
taskScheduler.executorLost("executor0", SlaveLost()) | ||
|
||
// Check that state associated with the lost task attempt is cleaned up: | ||
assert(taskScheduler.taskIdToExecutorId.isEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah is it prohibitively difficult to add a 2nd test for when the executorLost happens as a result of a status update call (e.g., have two tasks running on a particular executor, fail one of them via statusUpdate, and make sure the other one's state gets cleaned up?)? The scheduler code is such a tangled web that it would be nice to test both code paths.
Thanks for the review pass. I'll split the thread-safety fix into a separate PR and will work on addressing your comments now. I'll see about adding new unit tests to cover the |
If Kay is happy with the last couple of changes, then I'm fine with this, too. The only tiny nit I've still got is a change from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thanks for all of the work on this Josh! Happy to review the version for master if the merge isn't clean. |
@markhamstra, I'll fix that in a followup patch to address the pre-existing synchronization / thread-safety issue impacting that method. |
Thanks, @JoshRosen |
Alright, I'm going to merge this now and will open backport PRs against the other branches. Thanks for reviewing! |
…executor loss ## What changes were proposed in this pull request? This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails. This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several `ShuffleDependency` instances that were retained by `TaskSetManager`s inside the scheduler but were not otherwise referenced. Each of these `TaskSetManager`s was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map. Entries are added to the `taskIdToTaskSetManager` map when tasks are launched and are removed inside of `TaskScheduler.statusUpdate()`, which is invoked by the scheduler backend while processing `StatusUpdate` messages from executors. The problem with this design is that a completely dead executor will never send a `StatusUpdate`. There is [some code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338) in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` state (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. The `executorLost` and [`removeExecutor`](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527) methods don't appear to perform any cleanup of the `taskId -> *` mappings, causing the leaks observed here. This patch's fix is to maintain a `executorId -> running task id` mapping so that these `taskId -> *` maps can be properly cleaned up following an executor loss. There are some potential corner-case interactions that I'm concerned about here, especially some details in [the comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523) in `removeExecutor`, so I'd appreciate a very careful review of these changes. This PR is opened against branch-2.0, where I first observed this problem, but will also need to be fixed in master, branch-2.1, and branch-1.6 (which I'll do in followup PRs after this fix is reviewed and merged). ## How was this patch tested? I added a new unit test to `TaskSchedulerImplSuite`. You can check out this PR as of 25e455e to see the failing test. cc kayousterhout, markhamstra, rxin for review. Author: Josh Rosen <joshrosen@databricks.com> Closes #15986 from JoshRosen/fix-leak-following-total-executor-loss.
## What changes were proposed in this pull request? _This is the master branch version of apache#15986; the original description follows:_ This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails. This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several `ShuffleDependency` instances that were retained by `TaskSetManager`s inside the scheduler but were not otherwise referenced. Each of these `TaskSetManager`s was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map. Entries are added to the `taskIdToTaskSetManager` map when tasks are launched and are removed inside of `TaskScheduler.statusUpdate()`, which is invoked by the scheduler backend while processing `StatusUpdate` messages from executors. The problem with this design is that a completely dead executor will never send a `StatusUpdate`. There is [some code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338) in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` state (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. The `executorLost` and [`removeExecutor`](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527) methods don't appear to perform any cleanup of the `taskId -> *` mappings, causing the leaks observed here. This patch's fix is to maintain a `executorId -> running task id` mapping so that these `taskId -> *` maps can be properly cleaned up following an executor loss. There are some potential corner-case interactions that I'm concerned about here, especially some details in [the comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523) in `removeExecutor`, so I'd appreciate a very careful review of these changes. ## How was this patch tested? I added a new unit test to `TaskSchedulerImplSuite`. /cc kayousterhout and markhamstra, who reviewed apache#15986. Author: Josh Rosen <joshrosen@databricks.com> Closes apache#16045 from JoshRosen/fix-leak-following-total-executor-loss-master.
_This is the master branch version of #15986; the original description follows:_ This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails. This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several `ShuffleDependency` instances that were retained by `TaskSetManager`s inside the scheduler but were not otherwise referenced. Each of these `TaskSetManager`s was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map. Entries are added to the `taskIdToTaskSetManager` map when tasks are launched and are removed inside of `TaskScheduler.statusUpdate()`, which is invoked by the scheduler backend while processing `StatusUpdate` messages from executors. The problem with this design is that a completely dead executor will never send a `StatusUpdate`. There is [some code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338) in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` state (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. The `executorLost` and [`removeExecutor`](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527) methods don't appear to perform any cleanup of the `taskId -> *` mappings, causing the leaks observed here. This patch's fix is to maintain a `executorId -> running task id` mapping so that these `taskId -> *` maps can be properly cleaned up following an executor loss. There are some potential corner-case interactions that I'm concerned about here, especially some details in [the comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523) in `removeExecutor`, so I'd appreciate a very careful review of these changes. I added a new unit test to `TaskSchedulerImplSuite`. /cc kayousterhout and markhamstra, who reviewed #15986. Author: Josh Rosen <joshrosen@databricks.com> Closes #16045 from JoshRosen/fix-leak-following-total-executor-loss-master. (cherry picked from commit 9a02f68) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
…executor loss This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails. This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several `ShuffleDependency` instances that were retained by `TaskSetManager`s inside the scheduler but were not otherwise referenced. Each of these `TaskSetManager`s was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map. Entries are added to the `taskIdToTaskSetManager` map when tasks are launched and are removed inside of `TaskScheduler.statusUpdate()`, which is invoked by the scheduler backend while processing `StatusUpdate` messages from executors. The problem with this design is that a completely dead executor will never send a `StatusUpdate`. There is [some code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338) in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` state (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. The `executorLost` and [`removeExecutor`](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527) methods don't appear to perform any cleanup of the `taskId -> *` mappings, causing the leaks observed here. This patch's fix is to maintain a `executorId -> running task id` mapping so that these `taskId -> *` maps can be properly cleaned up following an executor loss. There are some potential corner-case interactions that I'm concerned about here, especially some details in [the comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523) in `removeExecutor`, so I'd appreciate a very careful review of these changes. This PR is opened against branch-2.0, where I first observed this problem, but will also need to be fixed in master, branch-2.1, and branch-1.6 (which I'll do in followup PRs after this fix is reviewed and merged). I added a new unit test to `TaskSchedulerImplSuite`. You can check out this PR as of 25e455e to see the failing test. cc kayousterhout, markhamstra, rxin for review. Author: Josh Rosen <joshrosen@databricks.com> Closes apache#15986 from JoshRosen/fix-leak-following-total-executor-loss.
…utors ## What changes were proposed in this pull request? The method `TaskSchedulerImpl.runningTasksByExecutors()` accesses the mutable `executorIdToRunningTaskIds` map without proper synchronization. In addition, as markhamstra pointed out in #15986, the signature's use of parentheses is a little odd given that this is a pure getter method. This patch fixes both issues. ## How was this patch tested? Covered by existing tests. Author: Josh Rosen <joshrosen@databricks.com> Closes #16073 from JoshRosen/runningTasksByExecutors-thread-safety. (cherry picked from commit c51c772) Signed-off-by: Andrew Or <andrewor14@gmail.com>
…utors ## What changes were proposed in this pull request? The method `TaskSchedulerImpl.runningTasksByExecutors()` accesses the mutable `executorIdToRunningTaskIds` map without proper synchronization. In addition, as markhamstra pointed out in #15986, the signature's use of parentheses is a little odd given that this is a pure getter method. This patch fixes both issues. ## How was this patch tested? Covered by existing tests. Author: Josh Rosen <joshrosen@databricks.com> Closes #16073 from JoshRosen/runningTasksByExecutors-thread-safety.
…utors ## What changes were proposed in this pull request? The method `TaskSchedulerImpl.runningTasksByExecutors()` accesses the mutable `executorIdToRunningTaskIds` map without proper synchronization. In addition, as markhamstra pointed out in #15986, the signature's use of parentheses is a little odd given that this is a pure getter method. This patch fixes both issues. ## How was this patch tested? Covered by existing tests. Author: Josh Rosen <joshrosen@databricks.com> Closes #16073 from JoshRosen/runningTasksByExecutors-thread-safety. (cherry picked from commit c51c772) Signed-off-by: Andrew Or <andrewor14@gmail.com>
…executor loss ## What changes were proposed in this pull request? _This is the master branch-1.6 version of #15986; the original description follows:_ This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails. This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several `ShuffleDependency` instances that were retained by `TaskSetManager`s inside the scheduler but were not otherwise referenced. Each of these `TaskSetManager`s was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map. Entries are added to the `taskIdToTaskSetManager` map when tasks are launched and are removed inside of `TaskScheduler.statusUpdate()`, which is invoked by the scheduler backend while processing `StatusUpdate` messages from executors. The problem with this design is that a completely dead executor will never send a `StatusUpdate`. There is [some code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338) in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` state (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. The `executorLost` and [`removeExecutor`](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527) methods don't appear to perform any cleanup of the `taskId -> *` mappings, causing the leaks observed here. This patch's fix is to maintain a `executorId -> running task id` mapping so that these `taskId -> *` maps can be properly cleaned up following an executor loss. There are some potential corner-case interactions that I'm concerned about here, especially some details in [the comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523) in `removeExecutor`, so I'd appreciate a very careful review of these changes. ## How was this patch tested? I added a new unit test to `TaskSchedulerImplSuite`. /cc kayousterhout and markhamstra, who reviewed #15986. Author: Josh Rosen <joshrosen@databricks.com> Closes #16070 from JoshRosen/fix-leak-following-total-executor-loss-1.6.
…executor loss ## What changes were proposed in this pull request? _This is the master branch-1.6 version of apache#15986; the original description follows:_ This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails. This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several `ShuffleDependency` instances that were retained by `TaskSetManager`s inside the scheduler but were not otherwise referenced. Each of these `TaskSetManager`s was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map. Entries are added to the `taskIdToTaskSetManager` map when tasks are launched and are removed inside of `TaskScheduler.statusUpdate()`, which is invoked by the scheduler backend while processing `StatusUpdate` messages from executors. The problem with this design is that a completely dead executor will never send a `StatusUpdate`. There is [some code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338) in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` state (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. The `executorLost` and [`removeExecutor`](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527) methods don't appear to perform any cleanup of the `taskId -> *` mappings, causing the leaks observed here. This patch's fix is to maintain a `executorId -> running task id` mapping so that these `taskId -> *` maps can be properly cleaned up following an executor loss. There are some potential corner-case interactions that I'm concerned about here, especially some details in [the comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523) in `removeExecutor`, so I'd appreciate a very careful review of these changes. ## How was this patch tested? I added a new unit test to `TaskSchedulerImplSuite`. /cc kayousterhout and markhamstra, who reviewed apache#15986. Author: Josh Rosen <joshrosen@databricks.com> Closes apache#16070 from JoshRosen/fix-leak-following-total-executor-loss-1.6.
## What changes were proposed in this pull request? _This is the master branch version of apache#15986; the original description follows:_ This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails. This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several `ShuffleDependency` instances that were retained by `TaskSetManager`s inside the scheduler but were not otherwise referenced. Each of these `TaskSetManager`s was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map. Entries are added to the `taskIdToTaskSetManager` map when tasks are launched and are removed inside of `TaskScheduler.statusUpdate()`, which is invoked by the scheduler backend while processing `StatusUpdate` messages from executors. The problem with this design is that a completely dead executor will never send a `StatusUpdate`. There is [some code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338) in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` state (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. The `executorLost` and [`removeExecutor`](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527) methods don't appear to perform any cleanup of the `taskId -> *` mappings, causing the leaks observed here. This patch's fix is to maintain a `executorId -> running task id` mapping so that these `taskId -> *` maps can be properly cleaned up following an executor loss. There are some potential corner-case interactions that I'm concerned about here, especially some details in [the comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523) in `removeExecutor`, so I'd appreciate a very careful review of these changes. ## How was this patch tested? I added a new unit test to `TaskSchedulerImplSuite`. /cc kayousterhout and markhamstra, who reviewed apache#15986. Author: Josh Rosen <joshrosen@databricks.com> Closes apache#16045 from JoshRosen/fix-leak-following-total-executor-loss-master.
…utors ## What changes were proposed in this pull request? The method `TaskSchedulerImpl.runningTasksByExecutors()` accesses the mutable `executorIdToRunningTaskIds` map without proper synchronization. In addition, as markhamstra pointed out in apache#15986, the signature's use of parentheses is a little odd given that this is a pure getter method. This patch fixes both issues. ## How was this patch tested? Covered by existing tests. Author: Josh Rosen <joshrosen@databricks.com> Closes apache#16073 from JoshRosen/runningTasksByExecutors-thread-safety.
## What changes were proposed in this pull request? _This is the master branch version of apache#15986; the original description follows:_ This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails. This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several `ShuffleDependency` instances that were retained by `TaskSetManager`s inside the scheduler but were not otherwise referenced. Each of these `TaskSetManager`s was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map. Entries are added to the `taskIdToTaskSetManager` map when tasks are launched and are removed inside of `TaskScheduler.statusUpdate()`, which is invoked by the scheduler backend while processing `StatusUpdate` messages from executors. The problem with this design is that a completely dead executor will never send a `StatusUpdate`. There is [some code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338) in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` state (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. The `executorLost` and [`removeExecutor`](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527) methods don't appear to perform any cleanup of the `taskId -> *` mappings, causing the leaks observed here. This patch's fix is to maintain a `executorId -> running task id` mapping so that these `taskId -> *` maps can be properly cleaned up following an executor loss. There are some potential corner-case interactions that I'm concerned about here, especially some details in [the comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523) in `removeExecutor`, so I'd appreciate a very careful review of these changes. ## How was this patch tested? I added a new unit test to `TaskSchedulerImplSuite`. /cc kayousterhout and markhamstra, who reviewed apache#15986. Author: Josh Rosen <joshrosen@databricks.com> Closes apache#16045 from JoshRosen/fix-leak-following-total-executor-loss-master.
…executor loss ## What changes were proposed in this pull request? _This is the master branch-1.6 version of apache#15986; the original description follows:_ This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails. This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several `ShuffleDependency` instances that were retained by `TaskSetManager`s inside the scheduler but were not otherwise referenced. Each of these `TaskSetManager`s was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map. Entries are added to the `taskIdToTaskSetManager` map when tasks are launched and are removed inside of `TaskScheduler.statusUpdate()`, which is invoked by the scheduler backend while processing `StatusUpdate` messages from executors. The problem with this design is that a completely dead executor will never send a `StatusUpdate`. There is [some code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338) in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` state (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. The `executorLost` and [`removeExecutor`](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527) methods don't appear to perform any cleanup of the `taskId -> *` mappings, causing the leaks observed here. This patch's fix is to maintain a `executorId -> running task id` mapping so that these `taskId -> *` maps can be properly cleaned up following an executor loss. There are some potential corner-case interactions that I'm concerned about here, especially some details in [the comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523) in `removeExecutor`, so I'd appreciate a very careful review of these changes. ## How was this patch tested? I added a new unit test to `TaskSchedulerImplSuite`. /cc kayousterhout and markhamstra, who reviewed apache#15986. Author: Josh Rosen <joshrosen@databricks.com> Closes apache#16070 from JoshRosen/fix-leak-following-total-executor-loss-1.6.
## What changes were proposed in this pull request? _This is the master branch version of apache#15986; the original description follows:_ This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails. This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several `ShuffleDependency` instances that were retained by `TaskSetManager`s inside the scheduler but were not otherwise referenced. Each of these `TaskSetManager`s was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by the `TaskSchedulerImpl.taskIdToTaskSetManager` map. Entries are added to the `taskIdToTaskSetManager` map when tasks are launched and are removed inside of `TaskScheduler.statusUpdate()`, which is invoked by the scheduler backend while processing `StatusUpdate` messages from executors. The problem with this design is that a completely dead executor will never send a `StatusUpdate`. There is [some code](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L338) in `statusUpdate` which handles tasks that exit with the `TaskState.LOST` state (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. The `executorLost` and [`removeExecutor`](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L527) methods don't appear to perform any cleanup of the `taskId -> *` mappings, causing the leaks observed here. This patch's fix is to maintain a `executorId -> running task id` mapping so that these `taskId -> *` maps can be properly cleaned up following an executor loss. There are some potential corner-case interactions that I'm concerned about here, especially some details in [the comment](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L523) in `removeExecutor`, so I'd appreciate a very careful review of these changes. ## How was this patch tested? I added a new unit test to `TaskSchedulerImplSuite`. /cc kayousterhout and markhamstra, who reviewed apache#15986. Author: Josh Rosen <joshrosen@databricks.com> Closes apache#16045 from JoshRosen/fix-leak-following-total-executor-loss-master.
…utors ## What changes were proposed in this pull request? The method `TaskSchedulerImpl.runningTasksByExecutors()` accesses the mutable `executorIdToRunningTaskIds` map without proper synchronization. In addition, as markhamstra pointed out in apache#15986, the signature's use of parentheses is a little odd given that this is a pure getter method. This patch fixes both issues. ## How was this patch tested? Covered by existing tests. Author: Josh Rosen <joshrosen@databricks.com> Closes apache#16073 from JoshRosen/runningTasksByExecutors-thread-safety.
What changes were proposed in this pull request?
This patch fixes a critical resource leak in the TaskScheduler which could cause RDDs and ShuffleDependencies to be kept alive indefinitely if an executor with running tasks is permanently lost and the associated stage fails.
This problem was originally identified by analyzing the heap dump of a driver belonging to a cluster that had run out of shuffle space. This dump contained several
ShuffleDependency
instances that were retained byTaskSetManager
s inside the scheduler but were not otherwise referenced. Each of theseTaskSetManager
s was considered a "zombie" but had no running tasks and therefore should have been cleaned up. However, these zombie task sets were still referenced by theTaskSchedulerImpl.taskIdToTaskSetManager
map.Entries are added to the
taskIdToTaskSetManager
map when tasks are launched and are removed inside ofTaskScheduler.statusUpdate()
, which is invoked by the scheduler backend while processingStatusUpdate
messages from executors. The problem with this design is that a completely dead executor will never send aStatusUpdate
. There is some code instatusUpdate
which handles tasks that exit with theTaskState.LOST
state (which is supposed to correspond to a task failure triggered by total executor loss), but this state only seems to be used in Mesos fine-grained mode. There doesn't seem to be any code which performs per-task state cleanup for tasks that were running on an executor that completely disappears without sending any sort of final death message. TheexecutorLost
andremoveExecutor
methods don't appear to perform any cleanup of thetaskId -> *
mappings, causing the leaks observed here.This patch's fix is to maintain a
executorId -> running task id
mapping so that thesetaskId -> *
maps can be properly cleaned up following an executor loss.There are some potential corner-case interactions that I'm concerned about here, especially some details in the comment in
removeExecutor
, so I'd appreciate a very careful review of these changes.This PR is opened against branch-2.0, where I first observed this problem, but will also need to be fixed in master, branch-2.1, and branch-1.6 (which I'll do in followup PRs after this fix is reviewed and merged).
How was this patch tested?
I added a new unit test to
TaskSchedulerImplSuite
. You can check out this PR as of 25e455e to see the failing test.cc @kayousterhout, @markhamstra, @rxin for review.