Skip to content

Commit 11973a7

Browse files
kayousterhoutpwendell
authored andcommitted
Renamed stageIdToActiveJob to jobIdToActiveJob.
This data structure was misused and, as a result, later renamed to an incorrect name. This data structure seems to have gotten into this tangled state as a result of @henrydavidge using the stageID instead of the job Id to index into it and later @andrewor14 renaming the data structure to reflect this misunderstanding. This patch renames it and removes an incorrect indexing into it. The incorrect indexing into it meant that the code added by @henrydavidge to warn when a task size is too large (added here 5757993) was not always executed; this commit fixes that. Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #301 from kayousterhout/fixCancellation and squashes the following commits: bd3d3a4 [Kay Ousterhout] Renamed stageIdToActiveJob to jobIdToActiveJob.
1 parent ea9de65 commit 11973a7

File tree

2 files changed

+11
-12
lines changed

2 files changed

+11
-12
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class DAGScheduler(
8484
private[scheduler] val stageIdToJobIds = new TimeStampedHashMap[Int, HashSet[Int]]
8585
private[scheduler] val stageIdToStage = new TimeStampedHashMap[Int, Stage]
8686
private[scheduler] val shuffleToMapStage = new TimeStampedHashMap[Int, Stage]
87-
private[scheduler] val stageIdToActiveJob = new HashMap[Int, ActiveJob]
87+
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
8888
private[scheduler] val resultStageToJob = new HashMap[Stage, ActiveJob]
8989
private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo]
9090

@@ -536,7 +536,7 @@ class DAGScheduler(
536536
listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
537537
runLocally(job)
538538
} else {
539-
stageIdToActiveJob(jobId) = job
539+
jobIdToActiveJob(jobId) = job
540540
activeJobs += job
541541
resultStageToJob(finalStage) = job
542542
listenerBus.post(
@@ -559,7 +559,7 @@ class DAGScheduler(
559559
// Cancel all running jobs.
560560
runningStages.map(_.jobId).foreach(handleJobCancellation)
561561
activeJobs.clear() // These should already be empty by this point,
562-
stageIdToActiveJob.clear() // but just in case we lost track of some jobs...
562+
jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
563563

564564
case ExecutorAdded(execId, host) =>
565565
handleExecutorAdded(execId, host)
@@ -569,7 +569,6 @@ class DAGScheduler(
569569

570570
case BeginEvent(task, taskInfo) =>
571571
for (
572-
job <- stageIdToActiveJob.get(task.stageId);
573572
stage <- stageIdToStage.get(task.stageId);
574573
stageInfo <- stageToInfos.get(stage)
575574
) {
@@ -697,7 +696,7 @@ class DAGScheduler(
697696
private def activeJobForStage(stage: Stage): Option[Int] = {
698697
if (stageIdToJobIds.contains(stage.id)) {
699698
val jobsThatUseStage: Array[Int] = stageIdToJobIds(stage.id).toArray.sorted
700-
jobsThatUseStage.find(stageIdToActiveJob.contains)
699+
jobsThatUseStage.find(jobIdToActiveJob.contains)
701700
} else {
702701
None
703702
}
@@ -750,8 +749,8 @@ class DAGScheduler(
750749
}
751750
}
752751

753-
val properties = if (stageIdToActiveJob.contains(jobId)) {
754-
stageIdToActiveJob(stage.jobId).properties
752+
val properties = if (jobIdToActiveJob.contains(jobId)) {
753+
jobIdToActiveJob(stage.jobId).properties
755754
} else {
756755
// this stage will be assigned to "default" pool
757756
null
@@ -827,7 +826,7 @@ class DAGScheduler(
827826
job.numFinished += 1
828827
// If the whole job has finished, remove it
829828
if (job.numFinished == job.numPartitions) {
830-
stageIdToActiveJob -= stage.jobId
829+
jobIdToActiveJob -= stage.jobId
831830
activeJobs -= job
832831
resultStageToJob -= stage
833832
markStageAsFinished(stage)
@@ -986,11 +985,11 @@ class DAGScheduler(
986985
val independentStages = removeJobAndIndependentStages(jobId)
987986
independentStages.foreach(taskScheduler.cancelTasks)
988987
val error = new SparkException("Job %d cancelled".format(jobId))
989-
val job = stageIdToActiveJob(jobId)
988+
val job = jobIdToActiveJob(jobId)
990989
job.listener.jobFailed(error)
991990
jobIdToStageIds -= jobId
992991
activeJobs -= job
993-
stageIdToActiveJob -= jobId
992+
jobIdToActiveJob -= jobId
994993
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, job.finalStage.id)))
995994
}
996995
}
@@ -1011,7 +1010,7 @@ class DAGScheduler(
10111010
val error = new SparkException("Job aborted: " + reason)
10121011
job.listener.jobFailed(error)
10131012
jobIdToStageIdsRemove(job.jobId)
1014-
stageIdToActiveJob -= resultStage.jobId
1013+
jobIdToActiveJob -= resultStage.jobId
10151014
activeJobs -= job
10161015
resultStageToJob -= resultStage
10171016
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, failedStage.id)))

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
428428
assert(scheduler.pendingTasks.isEmpty)
429429
assert(scheduler.activeJobs.isEmpty)
430430
assert(scheduler.failedStages.isEmpty)
431-
assert(scheduler.stageIdToActiveJob.isEmpty)
431+
assert(scheduler.jobIdToActiveJob.isEmpty)
432432
assert(scheduler.jobIdToStageIds.isEmpty)
433433
assert(scheduler.stageIdToJobIds.isEmpty)
434434
assert(scheduler.stageIdToStage.isEmpty)

0 commit comments

Comments
 (0)