Skip to content

Commit bf71905

Browse files
rxinpwendell
authored andcommitted
[SPARK-3224] FetchFailed reduce stages should only show up once in failed stages (in UI)
This is a HOTFIX for 1.1. Author: Reynold Xin <rxin@apache.org> Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #2127 from rxin/SPARK-3224 and squashes the following commits: effb1ce [Reynold Xin] Move log message. 49282b3 [Reynold Xin] Kay's feedback. 3f01847 [Reynold Xin] Merge pull request #2 from kayousterhout/SPARK-3224 796d282 [Kay Ousterhout] Added unit test for SPARK-3224 3d3d356 [Reynold Xin] Remove map output loc even for repeated FetchFaileds. 1dd3eb5 [Reynold Xin] [SPARK-3224] FetchFailed reduce stages should only show up once in the failed stages UI.
1 parent e70aff6 commit bf71905

File tree

2 files changed

+59
-14
lines changed

2 files changed

+59
-14
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

+20-12
Original file line numberDiff line numberDiff line change
@@ -1045,31 +1045,39 @@ class DAGScheduler(
10451045
stage.pendingTasks += task
10461046

10471047
case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
1048-
// Mark the stage that the reducer was in as unrunnable
10491048
val failedStage = stageIdToStage(task.stageId)
1050-
markStageAsFinished(failedStage, Some("Fetch failure"))
1051-
runningStages -= failedStage
1052-
// TODO: Cancel running tasks in the stage
1053-
logInfo("Marking " + failedStage + " (" + failedStage.name +
1054-
") for resubmision due to a fetch failure")
1055-
// Mark the map whose fetch failed as broken in the map stage
10561049
val mapStage = shuffleToMapStage(shuffleId)
1057-
if (mapId != -1) {
1058-
mapStage.removeOutputLoc(mapId, bmAddress)
1059-
mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
1050+
1051+
// It is likely that we receive multiple FetchFailed for a single stage (because we have
1052+
// multiple tasks running concurrently on different executors). In that case, it is possible
1053+
// the fetch failure has already been handled by the scheduler.
1054+
if (runningStages.contains(failedStage)) {
1055+
logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
1056+
s"due to a fetch failure from $mapStage (${mapStage.name})")
1057+
markStageAsFinished(failedStage, Some("Fetch failure"))
1058+
runningStages -= failedStage
10601059
}
1061-
logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name +
1062-
"); marking it for resubmission")
1060+
10631061
if (failedStages.isEmpty && eventProcessActor != null) {
10641062
// Don't schedule an event to resubmit failed stages if failed isn't empty, because
10651063
// in that case the event will already have been scheduled. eventProcessActor may be
10661064
// null during unit tests.
1065+
// TODO: Cancel running tasks in the stage
10671066
import env.actorSystem.dispatcher
1067+
logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
1068+
s"$failedStage (${failedStage.name}) due to fetch failure")
10681069
env.actorSystem.scheduler.scheduleOnce(
10691070
RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)
10701071
}
10711072
failedStages += failedStage
10721073
failedStages += mapStage
1074+
1075+
// Mark the map whose fetch failed as broken in the map stage
1076+
if (mapId != -1) {
1077+
mapStage.removeOutputLoc(mapId, bmAddress)
1078+
mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
1079+
}
1080+
10731081
// TODO: mark the executor as failed only if there were lots of fetch failures on it
10741082
if (bmAddress != null) {
10751083
handleExecutorLost(bmAddress.executorId, Some(task.epoch))

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

+39-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.scheduler
1919

20-
import scala.collection.mutable.{HashSet, HashMap, Map}
20+
import scala.collection.mutable.{ArrayBuffer, HashSet, HashMap, Map}
2121
import scala.language.reflectiveCalls
2222

2323
import akka.actor._
@@ -98,7 +98,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
9898
val WAIT_TIMEOUT_MILLIS = 10000
9999
val sparkListener = new SparkListener() {
100100
val successfulStages = new HashSet[Int]()
101-
val failedStages = new HashSet[Int]()
101+
val failedStages = new ArrayBuffer[Int]()
102102
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
103103
val stageInfo = stageCompleted.stageInfo
104104
if (stageInfo.failureReason.isEmpty) {
@@ -435,6 +435,43 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
435435
assertDataStructuresEmpty
436436
}
437437

438+
test("trivial shuffle with multiple fetch failures") {
439+
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
440+
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
441+
val shuffleId = shuffleDep.shuffleId
442+
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
443+
submit(reduceRdd, Array(0, 1))
444+
complete(taskSets(0), Seq(
445+
(Success, makeMapStatus("hostA", 1)),
446+
(Success, makeMapStatus("hostB", 1))))
447+
// The MapOutputTracker should know about both map output locations.
448+
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) ===
449+
Array("hostA", "hostB"))
450+
451+
// The first result task fails, with a fetch failure for the output from the first mapper.
452+
runEvent(CompletionEvent(
453+
taskSets(1).tasks(0),
454+
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0),
455+
null,
456+
Map[Long, Any](),
457+
null,
458+
null))
459+
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
460+
assert(sparkListener.failedStages.contains(0))
461+
462+
// The second ResultTask fails, with a fetch failure for the output from the second mapper.
463+
runEvent(CompletionEvent(
464+
taskSets(1).tasks(0),
465+
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1),
466+
null,
467+
Map[Long, Any](),
468+
null,
469+
null))
470+
// The SparkListener should not receive redundant failure events.
471+
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
472+
assert(sparkListener.failedStages.size == 1)
473+
}
474+
438475
test("ignore late map task completions") {
439476
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
440477
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)

0 commit comments

Comments
 (0)