Skip to content

Commit 19685bb

Browse files
committed
switch to using latestInfo.attemptId, and add comments
1 parent a5f7c8c commit 19685bb

File tree

2 files changed

+11
-2
lines changed

2 files changed

+11
-2
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1126,7 +1126,10 @@ class DAGScheduler(
11261126
case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>
11271127
val failedStage = stageIdToStage(task.stageId)
11281128
val mapStage = shuffleToMapStage(shuffleId)
1129-
if (failedStage.attemptId - 1 > task.stageAttemptId) {
1129+
1130+
// failedStage.attemptId is already on the next attempt, so we have to use
1131+
// failedStage.latestInfo.attemptId
1132+
if (failedStage.latestInfo.attemptId != task.stageAttemptId) {
11301133
logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" +
11311134
s" ${task.stageAttemptId}, which has already failed")
11321135
} else {

core/src/main/scala/org/apache/spark/scheduler/Stage.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,13 @@ private[spark] abstract class Stage(
7777
id
7878
}
7979

80-
def attemptId: Int = nextAttemptId
80+
/**
81+
* The id for the **next** stage attempt.
82+
*
83+
* The unusual meaning of this method means its unlikely to hold the value you are interested in
84+
* -- you probably want to use [[latestInfo.attemptId]]
85+
*/
86+
private[spark] def attemptId: Int = nextAttemptId
8187

8288
override final def hashCode(): Int = id
8389
override final def equals(other: Any): Boolean = other match {

0 commit comments

Comments
 (0)