-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-6737] Fix memory leak in OutputCommitCoordinator #5397
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #29803 has started for PR 5397 at commit |
@@ -1264,6 +1270,7 @@ class DAGScheduler( | |||
try { // cancelTasks will fail if a SchedulerBackend does not implement killTask | |||
taskScheduler.cancelTasks(stageId, shouldInterruptThread) | |||
stage.latestInfo.stageFailed(failureReason) | |||
outputCommitCoordinator.stageEnd(stage.id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be better to put these three lines (at least) in a separate method:
private def endStage(stage: Stage): Unit = {
stage.latestInfo.stageFailed(failureReason)
outputCommitCoordinator.stageEnd(stage.id)
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
}
That would make it harder to miss things like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It actually looks like handleTaskCompletion
has a nested markStageAsFinished
method that looks like it should do this. That method also removes the stage from runningStages
, which doesn't appear to happen in all of the paths where we post SparkListenerStageCompleted
. Let me take a look at this and see whether there's a safe way to refactor things to use this method.
@vanzin, I've refactored this code to extract the stage completion code into a new |
@@ -1098,7 +1085,6 @@ class DAGScheduler( | |||
logInfo(s"Marking $failedStage (${failedStage.name}) as failed " + | |||
s"due to a fetch failure from $mapStage (${mapStage.name})") | |||
markStageAsFinished(failedStage, Some(failureMessage)) | |||
runningStages -= failedStage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was redundant, since markStagesAsFinished
removes the stage from runningStages
.
Test build #29806 has started for PR 5397 at commit |
@@ -1263,8 +1269,7 @@ class DAGScheduler( | |||
if (runningStages.contains(stage)) { | |||
try { // cancelTasks will fail if a SchedulerBackend does not implement killTask | |||
taskScheduler.cancelTasks(stageId, shouldInterruptThread) | |||
stage.latestInfo.stageFailed(failureReason) | |||
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) | |||
markStageAsFinished(stage, Some(failureReason)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a slight change in behavior here: in the old code, we never removed the stage from runningStages
here even though we posted a StageCompletion listener event. I think this was probably a bug. In this updated code, markStagesAsFinished
will remove the stage from runningStages
.
LGTM. |
Test build #29803 has finished for PR 5397 at commit
|
Test PASSed. |
Test build #29806 has finished for PR 5397 at commit
|
Test PASSed. |
LGTM too. |
Thanks for reviewing. I'm going to merge this into master and open a separate PR to backport to branch-1.3. |
Actually, it appears that the merge conflicts with |
This patch fixes a memory leak in the DAGScheduler, which caused us to leak a map entry per submitted stage. The problem is that the OutputCommitCoordinator needs to be informed when stages end in order to remove entries from its `authorizedCommitters` map, but the DAGScheduler only called it in one of the four code paths that are used to mark stages as completed. This patch fixes this issue by consolidating the processing of stage completion into a new `markStageAsFinished` method and updates DAGSchedulerSuite's `assertDataStructuresEmpty` assertion to also check the OutputCommitCoordinator data structures. I've also added a comment at the top of DAGScheduler so that we remember to update this test when adding new data structures. Author: Josh Rosen <joshrosen@databricks.com> Closes #5397 from JoshRosen/SPARK-6737 and squashes the following commits: af3b02f [Josh Rosen] Consolidate stage completion handling code in a single method. e96ce3a [Josh Rosen] Consolidate stage completion handling code in a single method. 3052aea [Josh Rosen] Comment update 7896899 [Josh Rosen] Fix SPARK-6737 by informing OutputCommitCoordinator of all stage end events. 4ead1dc [Josh Rosen] Add regression tests for SPARK-6737 (cherry picked from commit c83e039) Signed-off-by: Josh Rosen <joshrosen@databricks.com> Conflicts: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
This patch fixes a memory leak in the DAGScheduler, which caused us to leak a map entry per submitted stage. The problem is that the OutputCommitCoordinator needs to be informed when stages end in order to remove entries from its
authorizedCommitters
map, but the DAGScheduler only called it in one of the four code paths that are used to mark stages as completed.This patch fixes this issue by consolidating the processing of stage completion into a new
markStageAsFinished
method and updates DAGSchedulerSuite'sassertDataStructuresEmpty
assertion to also check the OutputCommitCoordinator data structures. I've also added a comment at the top of DAGScheduler so that we remember to update this test when adding new data structures.