@@ -84,7 +84,7 @@ class DAGScheduler(
84
84
private [scheduler] val stageIdToJobIds = new TimeStampedHashMap [Int , HashSet [Int ]]
85
85
private [scheduler] val stageIdToStage = new TimeStampedHashMap [Int , Stage ]
86
86
private [scheduler] val shuffleToMapStage = new TimeStampedHashMap [Int , Stage ]
87
- private [scheduler] val stageIdToActiveJob = new HashMap [Int , ActiveJob ]
87
+ private [scheduler] val jobIdToActiveJob = new HashMap [Int , ActiveJob ]
88
88
private [scheduler] val resultStageToJob = new HashMap [Stage , ActiveJob ]
89
89
private [spark] val stageToInfos = new TimeStampedHashMap [Stage , StageInfo ]
90
90
@@ -536,7 +536,7 @@ class DAGScheduler(
536
536
listenerBus.post(SparkListenerJobStart (job.jobId, Array [Int ](), properties))
537
537
runLocally(job)
538
538
} else {
539
- stageIdToActiveJob (jobId) = job
539
+ jobIdToActiveJob (jobId) = job
540
540
activeJobs += job
541
541
resultStageToJob(finalStage) = job
542
542
listenerBus.post(
@@ -559,7 +559,7 @@ class DAGScheduler(
559
559
// Cancel all running jobs.
560
560
runningStages.map(_.jobId).foreach(handleJobCancellation)
561
561
activeJobs.clear() // These should already be empty by this point,
562
- stageIdToActiveJob .clear() // but just in case we lost track of some jobs...
562
+ jobIdToActiveJob .clear() // but just in case we lost track of some jobs...
563
563
564
564
case ExecutorAdded (execId, host) =>
565
565
handleExecutorAdded(execId, host)
@@ -569,7 +569,6 @@ class DAGScheduler(
569
569
570
570
case BeginEvent (task, taskInfo) =>
571
571
for (
572
- job <- stageIdToActiveJob.get(task.stageId);
573
572
stage <- stageIdToStage.get(task.stageId);
574
573
stageInfo <- stageToInfos.get(stage)
575
574
) {
@@ -697,7 +696,7 @@ class DAGScheduler(
697
696
private def activeJobForStage (stage : Stage ): Option [Int ] = {
698
697
if (stageIdToJobIds.contains(stage.id)) {
699
698
val jobsThatUseStage : Array [Int ] = stageIdToJobIds(stage.id).toArray.sorted
700
- jobsThatUseStage.find(stageIdToActiveJob .contains)
699
+ jobsThatUseStage.find(jobIdToActiveJob .contains)
701
700
} else {
702
701
None
703
702
}
@@ -750,8 +749,8 @@ class DAGScheduler(
750
749
}
751
750
}
752
751
753
- val properties = if (stageIdToActiveJob .contains(jobId)) {
754
- stageIdToActiveJob (stage.jobId).properties
752
+ val properties = if (jobIdToActiveJob .contains(jobId)) {
753
+ jobIdToActiveJob (stage.jobId).properties
755
754
} else {
756
755
// this stage will be assigned to "default" pool
757
756
null
@@ -827,7 +826,7 @@ class DAGScheduler(
827
826
job.numFinished += 1
828
827
// If the whole job has finished, remove it
829
828
if (job.numFinished == job.numPartitions) {
830
- stageIdToActiveJob -= stage.jobId
829
+ jobIdToActiveJob -= stage.jobId
831
830
activeJobs -= job
832
831
resultStageToJob -= stage
833
832
markStageAsFinished(stage)
@@ -986,11 +985,11 @@ class DAGScheduler(
986
985
val independentStages = removeJobAndIndependentStages(jobId)
987
986
independentStages.foreach(taskScheduler.cancelTasks)
988
987
val error = new SparkException (" Job %d cancelled" .format(jobId))
989
- val job = stageIdToActiveJob (jobId)
988
+ val job = jobIdToActiveJob (jobId)
990
989
job.listener.jobFailed(error)
991
990
jobIdToStageIds -= jobId
992
991
activeJobs -= job
993
- stageIdToActiveJob -= jobId
992
+ jobIdToActiveJob -= jobId
994
993
listenerBus.post(SparkListenerJobEnd (job.jobId, JobFailed (error, job.finalStage.id)))
995
994
}
996
995
}
@@ -1011,7 +1010,7 @@ class DAGScheduler(
1011
1010
val error = new SparkException (" Job aborted: " + reason)
1012
1011
job.listener.jobFailed(error)
1013
1012
jobIdToStageIdsRemove(job.jobId)
1014
- stageIdToActiveJob -= resultStage.jobId
1013
+ jobIdToActiveJob -= resultStage.jobId
1015
1014
activeJobs -= job
1016
1015
resultStageToJob -= resultStage
1017
1016
listenerBus.post(SparkListenerJobEnd (job.jobId, JobFailed (error, failedStage.id)))
0 commit comments