Skip to content

Commit a9eea03

Browse files
committed
address Matei's comments
1 parent ac878ab commit a9eea03

File tree

2 files changed

+109
-85
lines changed

2 files changed

+109
-85
lines changed

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1112,9 +1112,9 @@ abstract class RDD[T: ClassTag](
11121112
@transient private var doCheckpointCalled = false
11131113

11141114
/**
1115-
* Performs the checkpointing of this RDD by saving this. It is called by the DAGScheduler
1116-
* after a job using this RDD has completed (therefore the RDD has been materialized and
1117-
* potentially stored in memory). doCheckpoint() is called recursively on the parent RDDs.
1115+
* Performs the checkpointing of this RDD by saving this. It is called after a job using this RDD
1116+
* has completed (therefore the RDD has been materialized and potentially stored in memory).
1117+
* doCheckpoint() is called recursively on the parent RDDs.
11181118
*/
11191119
private[spark] def doCheckpoint() {
11201120
if (!doCheckpointCalled) {

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 106 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -498,12 +498,16 @@ class DAGScheduler(
498498
* the last fetch failure.
499499
*/
500500
private[scheduler] def resubmitFailedStages() {
501-
logInfo("Resubmitting failed stages")
502-
clearCacheLocs()
503-
val failedStagesCopy = failedStages.toArray
504-
failedStages.clear()
505-
for (stage <- failedStagesCopy.sortBy(_.jobId)) {
506-
submitStage(stage)
501+
if (failedStages.size > 0) {
502+
// Failed stages may be removed by job cancellation, so failed might be empty even if
503+
// the ResubmitFailedStages event has been scheduled.
504+
logInfo("Resubmitting failed stages")
505+
clearCacheLocs()
506+
val failedStagesCopy = failedStages.toArray
507+
failedStages.clear()
508+
for (stage <- failedStagesCopy.sortBy(_.jobId)) {
509+
submitStage(stage)
510+
}
507511
}
508512
}
509513

@@ -582,6 +586,91 @@ class DAGScheduler(
582586
}
583587
}
584588

589+
private[scheduler] def handleJobGroupCancelled(groupId: String) {
590+
// Cancel all jobs belonging to this job group.
591+
// First finds all active jobs with this group id, and then kill stages for them.
592+
val activeInGroup = activeJobs.filter(activeJob =>
593+
groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
594+
val jobIds = activeInGroup.map(_.jobId)
595+
jobIds.foreach(handleJobCancellation(_, "part of cancel job group"))
596+
}
597+
598+
private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
599+
for (stage <- stageIdToStage.get(task.stageId); stageInfo <- stageToInfos.get(stage)) {
600+
if (taskInfo.serializedSize > DAGScheduler.TASK_SIZE_TO_WARN * 1024 &&
601+
!stageInfo.emittedTaskSizeWarning) {
602+
stageInfo.emittedTaskSizeWarning = true
603+
logWarning(("Stage %d (%s) contains a task of very large " +
604+
"size (%d KB). The maximum recommended task size is %d KB.").format(
605+
task.stageId, stageInfo.name, taskInfo.serializedSize / 1024,
606+
DAGScheduler.TASK_SIZE_TO_WARN))
607+
}
608+
}
609+
listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo))
610+
}
611+
612+
private[scheduler] def handleTaskSetFailed(taskSet: TaskSet, reason: String) {
613+
stageIdToStage.get(taskSet.stageId).foreach {abortStage(_, reason) }
614+
}
615+
616+
private[scheduler] def cleanUpAfterSchedulerStop() {
617+
for (job <- activeJobs) {
618+
val error = new SparkException("Job cancelled because SparkContext was shut down")
619+
job.listener.jobFailed(error)
620+
// Tell the listeners that all of the running stages have ended. Don't bother
621+
// cancelling the stages because if the DAG scheduler is stopped, the entire application
622+
// is in the process of getting stopped.
623+
val stageFailedMessage = "Stage cancelled because SparkContext was shut down"
624+
runningStages.foreach { stage =>
625+
val info = stageToInfos(stage)
626+
info.stageFailed(stageFailedMessage)
627+
listenerBus.post(SparkListenerStageCompleted(info))
628+
}
629+
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
630+
}
631+
}
632+
633+
private[scheduler] def handleJobSubmitted(jobId: Int,
634+
finalRDD: RDD[_],
635+
func: (TaskContext, Iterator[_]) => _,
636+
partitions: Array[Int],
637+
allowLocal: Boolean,
638+
callSite: String,
639+
listener: JobListener,
640+
properties: Properties = null) {
641+
var finalStage: Stage = null
642+
try {
643+
// New stage creation may throw an exception if, for example, jobs are run on a
644+
// HadoopRDD whose underlying HDFS files have been deleted.
645+
finalStage = newStage(finalRDD, partitions.size, None, jobId, Some(callSite))
646+
} catch {
647+
case e: Exception =>
648+
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
649+
listener.jobFailed(e)
650+
}
651+
if (finalStage != null) {
652+
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
653+
clearCacheLocs()
654+
logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".
655+
format(job.jobId, callSite, partitions.length, allowLocal))
656+
logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
657+
logInfo("Parents of final stage: " + finalStage.parents)
658+
logInfo("Missing parents: " + getMissingParentStages(finalStage))
659+
if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
660+
// Compute very short actions like first() or take() with no parent stages locally.
661+
listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
662+
runLocally(job)
663+
} else {
664+
jobIdToActiveJob(jobId) = job
665+
activeJobs += job
666+
resultStageToJob(finalStage) = job
667+
listenerBus.post(SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray,
668+
properties))
669+
submitStage(finalStage)
670+
}
671+
}
672+
}
673+
585674
/** Submits stage, but first recursively submits any missing parents. */
586675
private[scheduler] def submitStage(stage: Stage) {
587676
val jobId = activeJobForStage(stage)
@@ -673,6 +762,10 @@ class DAGScheduler(
673762
*/
674763
private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
675764
val task = event.task
765+
val stageId = task.stageId
766+
val taskType = Utils.getFormattedClassName(task)
767+
listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
768+
event.taskMetrics))
676769
if (!stageIdToStage.contains(task.stageId)) {
677770
// Skip all the actions if the stage has been cancelled.
678771
return
@@ -1045,36 +1138,8 @@ private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGSchedule
10451138
*/
10461139
def receive = {
10471140
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
1048-
var finalStage: Stage = null
1049-
try {
1050-
// New stage creation may throw an exception if, for example, jobs are run on a
1051-
// HadoopRDD whose underlying HDFS files have been deleted.
1052-
finalStage = dagScheduler.newStage(rdd, partitions.size, None, jobId, Some(callSite))
1053-
} catch {
1054-
case e: Exception =>
1055-
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
1056-
listener.jobFailed(e)
1057-
}
1058-
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
1059-
dagScheduler.clearCacheLocs()
1060-
logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".
1061-
format(job.jobId, callSite, partitions.length, allowLocal))
1062-
logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
1063-
logInfo("Parents of final stage: " + finalStage.parents)
1064-
logInfo("Missing parents: " + dagScheduler.getMissingParentStages(finalStage))
1065-
if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
1066-
// Compute very short actions like first() or take() with no parent stages locally.
1067-
dagScheduler.listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
1068-
dagScheduler.runLocally(job)
1069-
} else {
1070-
dagScheduler.jobIdToActiveJob(jobId) = job
1071-
dagScheduler.activeJobs += job
1072-
dagScheduler.resultStageToJob(finalStage) = job
1073-
dagScheduler.listenerBus.post(
1074-
SparkListenerJobStart(job.jobId, dagScheduler.jobIdToStageIds(jobId).toArray,
1075-
properties))
1076-
dagScheduler.submitStage(finalStage)
1077-
}
1141+
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
1142+
listener, properties)
10781143

10791144
case StageCancelled(stageId) =>
10801145
dagScheduler.handleStageCancellation(stageId)
@@ -1083,12 +1148,7 @@ private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGSchedule
10831148
dagScheduler.handleJobCancellation(jobId)
10841149

10851150
case JobGroupCancelled(groupId) =>
1086-
// Cancel all jobs belonging to this job group.
1087-
// First finds all active jobs with this group id, and then kill stages for them.
1088-
val activeInGroup = dagScheduler.activeJobs.filter(activeJob =>
1089-
groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
1090-
val jobIds = activeInGroup.map(_.jobId)
1091-
jobIds.foreach(dagScheduler.handleJobCancellation(_, "part of cancel job group"))
1151+
dagScheduler.handleJobGroupCancelled(groupId)
10921152

10931153
case AllJobsCancelled =>
10941154
dagScheduler.doCancelAllJobs()
@@ -1100,60 +1160,24 @@ private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGSchedule
11001160
dagScheduler.handleExecutorLost(execId)
11011161

11021162
case BeginEvent(task, taskInfo) =>
1103-
for (
1104-
job <- dagScheduler.jobIdToActiveJob.get(task.stageId);
1105-
stage <- dagScheduler.stageIdToStage.get(task.stageId);
1106-
stageInfo <- dagScheduler.stageToInfos.get(stage)
1107-
) {
1108-
if (taskInfo.serializedSize > DAGScheduler.TASK_SIZE_TO_WARN * 1024 &&
1109-
!stageInfo.emittedTaskSizeWarning) {
1110-
stageInfo.emittedTaskSizeWarning = true
1111-
logWarning(("Stage %d (%s) contains a task of very large " +
1112-
"size (%d KB). The maximum recommended task size is %d KB.").format(
1113-
task.stageId, stageInfo.name, taskInfo.serializedSize / 1024,
1114-
DAGScheduler.TASK_SIZE_TO_WARN))
1115-
}
1116-
}
1117-
dagScheduler.listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo))
1163+
dagScheduler.handleBeginEvent(task, taskInfo)
11181164

11191165
case GettingResultEvent(task, taskInfo) =>
11201166
dagScheduler.listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
11211167

11221168
case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
1123-
val stageId = task.stageId
1124-
val taskType = Utils.getFormattedClassName(task)
1125-
dagScheduler.listenerBus.post(SparkListenerTaskEnd(stageId, taskType, reason, taskInfo,
1126-
taskMetrics))
11271169
dagScheduler.handleTaskCompletion(completion)
11281170

11291171
case TaskSetFailed(taskSet, reason) =>
1130-
dagScheduler.stageIdToStage.get(taskSet.stageId).foreach {
1131-
dagScheduler.abortStage(_, reason) }
1172+
dagScheduler.handleTaskSetFailed(taskSet, reason)
11321173

11331174
case ResubmitFailedStages =>
1134-
if (dagScheduler.failedStages.size > 0) {
1135-
// Failed stages may be removed by job cancellation, so failed might be empty even if
1136-
// the ResubmitFailedStages event has been scheduled.
1137-
dagScheduler.resubmitFailedStages()
1138-
}
1175+
dagScheduler.resubmitFailedStages()
11391176
}
11401177

11411178
override def postStop() {
11421179
// Cancel any active jobs in postStop hook
1143-
for (job <- dagScheduler.activeJobs) {
1144-
val error = new SparkException("Job cancelled because SparkContext was shut down")
1145-
job.listener.jobFailed(error)
1146-
// Tell the listeners that all of the running stages have ended. Don't bother
1147-
// cancelling the stages because if the DAG scheduler is stopped, the entire application
1148-
// is in the process of getting stopped.
1149-
val stageFailedMessage = "Stage cancelled because SparkContext was shut down"
1150-
dagScheduler.runningStages.foreach { stage =>
1151-
val info = dagScheduler.stageToInfos(stage)
1152-
info.stageFailed(stageFailedMessage)
1153-
dagScheduler.listenerBus.post(SparkListenerStageCompleted(info))
1154-
}
1155-
dagScheduler.listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
1156-
}
1180+
dagScheduler.cleanUpAfterSchedulerStop()
11571181
}
11581182
}
11591183

0 commit comments

Comments
 (0)