@@ -1046,22 +1046,22 @@ class DAGScheduler(
1046
1046
1047
1047
case FetchFailed (bmAddress, shuffleId, mapId, reduceId) =>
1048
1048
val failedStage = stageIdToStage(task.stageId)
1049
- val mapStage = shuffleToMapStage(shuffleId)
1050
1049
// It is likely that we receive multiple FetchFailed for a single stage (because we have
1051
1050
// multiple tasks running concurrently on different executors). In that case, it is possible
1052
1051
// the fetch failure has already been handled by the scheduler.
1053
1052
if (runningStages.contains(failedStage)) {
1054
1053
markStageAsFinished(failedStage, Some (" Fetch failure" ))
1055
1054
runningStages -= failedStage
1056
- // TODO: Cancel running tasks in the stage
1057
- logInfo(s " Marking $failedStage ( ${failedStage.name}) for resubmision " +
1058
- s " due to a fetch failure from $mapStage ( ${mapStage.name}" )
1059
1055
}
1060
1056
1057
+ val mapStage = shuffleToMapStage(shuffleId)
1061
1058
if (failedStages.isEmpty && eventProcessActor != null ) {
1062
1059
// Don't schedule an event to resubmit failed stages if failed isn't empty, because
1063
1060
// in that case the event will already have been scheduled. eventProcessActor may be
1064
1061
// null during unit tests.
1062
+ // TODO: Cancel running tasks in the stage
1063
+ logInfo(s " Marking $failedStage ( ${failedStage.name}) for resubmision " +
1064
+ s " due to a fetch failure from $mapStage ( ${mapStage.name}" )
1065
1065
import env .actorSystem .dispatcher
1066
1066
env.actorSystem.scheduler.scheduleOnce(
1067
1067
RESUBMIT_TIMEOUT , eventProcessActor, ResubmitFailedStages )
0 commit comments