Skip to content

[SPARK-6737] Fix memory leak in OutputCommitCoordinator #5397

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from

Conversation

JoshRosen
Copy link
Contributor

This patch fixes a memory leak in the DAGScheduler, which caused us to leak a map entry per submitted stage. The problem is that the OutputCommitCoordinator needs to be informed when stages end in order to remove entries from its authorizedCommitters map, but the DAGScheduler only called it in one of the four code paths that are used to mark stages as completed.

This patch fixes this issue by consolidating the processing of stage completion into a new markStageAsFinished method and updates DAGSchedulerSuite's assertDataStructuresEmpty assertion to also check the OutputCommitCoordinator data structures. I've also added a comment at the top of DAGScheduler so that we remember to update this test when adding new data structures.

@SparkQA
Copy link

SparkQA commented Apr 7, 2015

Test build #29803 has started for PR 5397 at commit 3052aea.

@@ -1264,6 +1270,7 @@ class DAGScheduler(
try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
taskScheduler.cancelTasks(stageId, shouldInterruptThread)
stage.latestInfo.stageFailed(failureReason)
outputCommitCoordinator.stageEnd(stage.id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to put these three lines (at least) in a separate method:

private def endStage(stage: Stage): Unit = {
  stage.latestInfo.stageFailed(failureReason)
  outputCommitCoordinator.stageEnd(stage.id)
  listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
}

That would make it harder to miss things like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It actually looks like handleTaskCompletion has a nested markStageAsFinished method that looks like it should do this. That method also removes the stage from runningStages, which doesn't appear to happen in all of the paths where we post SparkListenerStageCompleted. Let me take a look at this and see whether there's a safe way to refactor things to use this method.

@JoshRosen
Copy link
Contributor Author

@vanzin, I've refactored this code to extract the stage completion code into a new markStageAsFinished method. There's one small change in behavior that I'll comment on inline.

@@ -1098,7 +1085,6 @@ class DAGScheduler(
logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
s"due to a fetch failure from $mapStage (${mapStage.name})")
markStageAsFinished(failedStage, Some(failureMessage))
runningStages -= failedStage
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was redundant, since markStagesAsFinished removes the stage from runningStages.

@SparkQA
Copy link

SparkQA commented Apr 7, 2015

Test build #29806 has started for PR 5397 at commit af3b02f.

@@ -1263,8 +1269,7 @@ class DAGScheduler(
if (runningStages.contains(stage)) {
try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
taskScheduler.cancelTasks(stageId, shouldInterruptThread)
stage.latestInfo.stageFailed(failureReason)
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
markStageAsFinished(stage, Some(failureReason))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a slight change in behavior here: in the old code, we never removed the stage from runningStages here even though we posted a StageCompletion listener event. I think this was probably a bug. In this updated code, markStagesAsFinished will remove the stage from runningStages.

@vanzin
Copy link
Contributor

vanzin commented Apr 7, 2015

LGTM.

@SparkQA
Copy link

SparkQA commented Apr 7, 2015

Test build #29803 has finished for PR 5397 at commit 3052aea.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29803/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Apr 7, 2015

Test build #29806 has finished for PR 5397 at commit af3b02f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29806/
Test PASSed.

@aarondav
Copy link
Contributor

aarondav commented Apr 7, 2015

LGTM too.

@JoshRosen
Copy link
Contributor Author

Thanks for reviewing. I'm going to merge this into master and open a separate PR to backport to branch-1.3.

@asfgit asfgit closed this in c83e039 Apr 7, 2015
@JoshRosen
Copy link
Contributor Author

Actually, it appears that the merge conflicts with branch-1.3 are trivial to resolve, so I'll perform a cherry pick, run the tests locally, the monitor Jenkins for the backport commit.

asfgit pushed a commit that referenced this pull request Apr 7, 2015
This patch fixes a memory leak in the DAGScheduler, which caused us to leak a map entry per submitted stage.  The problem is that the OutputCommitCoordinator needs to be informed when stages end in order to remove entries from its `authorizedCommitters` map, but the DAGScheduler only called it in one of the four code paths that are used to mark stages as completed.

This patch fixes this issue by consolidating the processing of stage completion into a new `markStageAsFinished` method and updates DAGSchedulerSuite's `assertDataStructuresEmpty` assertion to also check the OutputCommitCoordinator data structures.  I've also added a comment at the top of DAGScheduler so that we remember to update this test when adding new data structures.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #5397 from JoshRosen/SPARK-6737 and squashes the following commits:

af3b02f [Josh Rosen] Consolidate stage completion handling code in a single method.
e96ce3a [Josh Rosen] Consolidate stage completion handling code in a single method.
3052aea [Josh Rosen] Comment update
7896899 [Josh Rosen] Fix SPARK-6737 by informing OutputCommitCoordinator of all stage end events.
4ead1dc [Josh Rosen] Add regression tests for SPARK-6737

(cherry picked from commit c83e039)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>

Conflicts:
	core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants