Skip to content

Commit 770b169

Browse files
committed
Fix getPreferredLocations() DAGScheduler crash with try block.
1 parent 44a9b55 commit 770b169

File tree

2 files changed

+24
-16
lines changed

2 files changed

+24
-16
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -905,22 +905,29 @@ class DAGScheduler(
905905
return
906906
}
907907

908-
val tasks: Seq[Task[_]] = stage match {
909-
case stage: ShuffleMapStage =>
910-
partitionsToCompute.map { id =>
911-
val locs = getPreferredLocs(stage.rdd, id)
912-
val part = stage.rdd.partitions(id)
913-
new ShuffleMapTask(stage.id, taskBinary, part, locs)
914-
}
908+
val tasks: Seq[Task[_]] = try {
909+
stage match {
910+
case stage: ShuffleMapStage =>
911+
partitionsToCompute.map { id =>
912+
val locs = getPreferredLocs(stage.rdd, id)
913+
val part = stage.rdd.partitions(id)
914+
new ShuffleMapTask(stage.id, taskBinary, part, locs)
915+
}
915916

916-
case stage: ResultStage =>
917-
val job = stage.resultOfJob.get
918-
partitionsToCompute.map { id =>
919-
val p: Int = job.partitions(id)
920-
val part = stage.rdd.partitions(p)
921-
val locs = getPreferredLocs(stage.rdd, p)
922-
new ResultTask(stage.id, taskBinary, part, locs, id)
923-
}
917+
case stage: ResultStage =>
918+
val job = stage.resultOfJob.get
919+
partitionsToCompute.map { id =>
920+
val p: Int = job.partitions(id)
921+
val part = stage.rdd.partitions(p)
922+
val locs = getPreferredLocs(stage.rdd, p)
923+
new ResultTask(stage.id, taskBinary, part, locs, id)
924+
}
925+
}
926+
} catch {
927+
case NonFatal(e) =>
928+
abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}")
929+
runningStages -= stage
930+
return
924931
}
925932

926933
if (tasks.size > 0) {

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -800,14 +800,15 @@ class DAGSchedulerSuite
800800
}
801801

802802
test("getPreferredLocations errors should not crash DAGScheduler and SparkContext (SPARK-8606)") {
803-
val e1 = intercept[DAGSchedulerSuiteDummyException] {
803+
val e1 = intercept[SparkException] {
804804
val rdd = new MyRDD(sc, 2, Nil) {
805805
override def getPreferredLocations(split: Partition): Seq[String] = {
806806
throw new DAGSchedulerSuiteDummyException
807807
}
808808
}
809809
rdd.count()
810810
}
811+
assert(e1.getMessage.contains(classOf[DAGSchedulerSuiteDummyException].getName))
811812

812813
// Make sure we can still run local commands as well as cluster commands.
813814
assert(sc.parallelize(1 to 10, 2).count() === 10)

0 commit comments

Comments
 (0)