-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-6737] Fix memory leak in OutputCommitCoordinator #5397
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4ead1dc
7896899
3052aea
e96ce3a
af3b02f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,6 +49,10 @@ import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat | |
* not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task | ||
* a small number of times before cancelling the whole stage. | ||
* | ||
* Here's a checklist to use when making or reviewing changes to this class: | ||
* | ||
* - When adding a new data structure, update `DAGSchedulerSuite.assertDataStructuresEmpty` to | ||
* include the new structure. This will help to catch memory leaks. | ||
*/ | ||
private[spark] | ||
class DAGScheduler( | ||
|
@@ -110,6 +114,8 @@ class DAGScheduler( | |
// stray messages to detect. | ||
private val failedEpoch = new HashMap[String, Long] | ||
|
||
private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator | ||
|
||
// A closure serializer that we reuse. | ||
// This is only safe because DAGScheduler runs in a single thread. | ||
private val closureSerializer = SparkEnv.get.closureSerializer.newInstance() | ||
|
@@ -127,8 +133,6 @@ class DAGScheduler( | |
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this) | ||
taskScheduler.setDAGScheduler(this) | ||
|
||
private val outputCommitCoordinator = env.outputCommitCoordinator | ||
|
||
// Called by TaskScheduler to report task's starting. | ||
def taskStarted(task: Task[_], taskInfo: TaskInfo) { | ||
eventProcessLoop.post(BeginEvent(task, taskInfo)) | ||
|
@@ -709,9 +713,10 @@ class DAGScheduler( | |
// cancelling the stages because if the DAG scheduler is stopped, the entire application | ||
// is in the process of getting stopped. | ||
val stageFailedMessage = "Stage cancelled because SparkContext was shut down" | ||
runningStages.foreach { stage => | ||
stage.latestInfo.stageFailed(stageFailedMessage) | ||
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) | ||
// The `toArray` here is necessary so that we don't iterate over `runningStages` while | ||
// mutating it. | ||
runningStages.toArray.foreach { stage => | ||
markStageAsFinished(stage, Some(stageFailedMessage)) | ||
} | ||
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error))) | ||
} | ||
|
@@ -886,10 +891,9 @@ class DAGScheduler( | |
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) | ||
stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) | ||
} else { | ||
// Because we posted SparkListenerStageSubmitted earlier, we should post | ||
// SparkListenerStageCompleted here in case there are no tasks to run. | ||
outputCommitCoordinator.stageEnd(stage.id) | ||
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) | ||
// Because we posted SparkListenerStageSubmitted earlier, we should mark | ||
// the stage as completed here in case there are no tasks to run | ||
markStageAsFinished(stage, None) | ||
|
||
val debugString = stage match { | ||
case stage: ShuffleMapStage => | ||
|
@@ -901,7 +905,6 @@ class DAGScheduler( | |
s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})" | ||
} | ||
logDebug(debugString) | ||
runningStages -= stage | ||
} | ||
} | ||
|
||
|
@@ -967,22 +970,6 @@ class DAGScheduler( | |
} | ||
|
||
val stage = stageIdToStage(task.stageId) | ||
|
||
def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None): Unit = { | ||
val serviceTime = stage.latestInfo.submissionTime match { | ||
case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0) | ||
case _ => "Unknown" | ||
} | ||
if (errorMessage.isEmpty) { | ||
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) | ||
stage.latestInfo.completionTime = Some(clock.getTimeMillis()) | ||
} else { | ||
stage.latestInfo.stageFailed(errorMessage.get) | ||
logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime)) | ||
} | ||
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) | ||
runningStages -= stage | ||
} | ||
event.reason match { | ||
case Success => | ||
listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType, | ||
|
@@ -1098,7 +1085,6 @@ class DAGScheduler( | |
logInfo(s"Marking $failedStage (${failedStage.name}) as failed " + | ||
s"due to a fetch failure from $mapStage (${mapStage.name})") | ||
markStageAsFinished(failedStage, Some(failureMessage)) | ||
runningStages -= failedStage | ||
} | ||
|
||
if (disallowStageRetryForTest) { | ||
|
@@ -1214,6 +1200,26 @@ class DAGScheduler( | |
submitWaitingStages() | ||
} | ||
|
||
/** | ||
* Marks a stage as finished and removes it from the list of running stages. | ||
*/ | ||
private def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None): Unit = { | ||
val serviceTime = stage.latestInfo.submissionTime match { | ||
case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0) | ||
case _ => "Unknown" | ||
} | ||
if (errorMessage.isEmpty) { | ||
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) | ||
stage.latestInfo.completionTime = Some(clock.getTimeMillis()) | ||
} else { | ||
stage.latestInfo.stageFailed(errorMessage.get) | ||
logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime)) | ||
} | ||
outputCommitCoordinator.stageEnd(stage.id) | ||
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) | ||
runningStages -= stage | ||
} | ||
|
||
/** | ||
* Aborts all jobs depending on a particular Stage. This is called in response to a task set | ||
* being canceled by the TaskScheduler. Use taskSetFailed() to inject this event from outside. | ||
|
@@ -1263,8 +1269,7 @@ class DAGScheduler( | |
if (runningStages.contains(stage)) { | ||
try { // cancelTasks will fail if a SchedulerBackend does not implement killTask | ||
taskScheduler.cancelTasks(stageId, shouldInterruptThread) | ||
stage.latestInfo.stageFailed(failureReason) | ||
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) | ||
markStageAsFinished(stage, Some(failureReason)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's a slight change in behavior here: in the old code, we never removed the stage from |
||
} catch { | ||
case e: UnsupportedOperationException => | ||
logInfo(s"Could not cancel tasks for stage $stageId", e) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could avoid the comment by doing:
But either is fine.