@@ -1046,6 +1046,7 @@ class DAGScheduler(
1046
1046
1047
1047
case FetchFailed (bmAddress, shuffleId, mapId, reduceId) =>
1048
1048
val failedStage = stageIdToStage(task.stageId)
1049
+ val mapStage = shuffleToMapStage(shuffleId)
1049
1050
// It is likely that we receive multiple FetchFailed for a single stage (because we have
1050
1051
// multiple tasks running concurrently on different executors). In that case, it is possible
1051
1052
// the fetch failure has already been handled by the executor.
@@ -1056,13 +1057,6 @@ class DAGScheduler(
1056
1057
logInfo(" Marking " + failedStage + " (" + failedStage.name +
1057
1058
" ) for resubmision due to a fetch failure" )
1058
1059
1059
- // Mark the map whose fetch failed as broken in the map stage
1060
- val mapStage = shuffleToMapStage(shuffleId)
1061
- if (mapId != - 1 ) {
1062
- mapStage.removeOutputLoc(mapId, bmAddress)
1063
- mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
1064
- }
1065
-
1066
1060
logInfo(" The failed fetch was from " + mapStage + " (" + mapStage.name +
1067
1061
" ); marking it for resubmission" )
1068
1062
if (failedStages.isEmpty && eventProcessActor != null ) {
@@ -1076,6 +1070,13 @@ class DAGScheduler(
1076
1070
failedStages += failedStage
1077
1071
failedStages += mapStage
1078
1072
}
1073
+
1074
+ // Mark the map whose fetch failed as broken in the map stage
1075
+ if (mapId != - 1 ) {
1076
+ mapStage.removeOutputLoc(mapId, bmAddress)
1077
+ mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
1078
+ }
1079
+
1079
1080
// TODO: mark the executor as failed only if there were lots of fetch failures on it
1080
1081
if (bmAddress != null ) {
1081
1082
handleExecutorLost(bmAddress.executorId, Some (task.epoch))
0 commit comments