Skip to content

Commit 4c0b98c

Browse files
committed
[SPARK-32003] Update fileLostEpoch.
I inadvertently left out a line when transferring code. The fileLostEpoch needs to be updated with an entry for the failed executor with lost output. Adopted suggestions from wuyi and attilapiros.
1 parent 4e976ab commit 4c0b98c

File tree

2 files changed

+12
-16
lines changed

2 files changed

+12
-16
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1944,16 +1944,17 @@ private[spark] class DAGScheduler(
19441944
logDebug(s"Removing executor $execId, fileLost: $fileLost, currentEpoch: $currentEpoch")
19451945
if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
19461946
failedEpoch(execId) = currentEpoch
1947-
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
1947+
logInfo(s"Executor lost: $execId (epoch $currentEpoch)")
19481948
blockManagerMaster.removeExecutor(execId)
19491949
}
19501950
if (fileLost && (!fileLostEpoch.contains(execId) || fileLostEpoch(execId) < currentEpoch)) {
1951+
fileLostEpoch(execId) = currentEpoch
19511952
hostToUnregisterOutputs match {
19521953
case Some(host) =>
1953-
logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch))
1954+
logInfo(s"Shuffle files lost for host: $host (epoch $currentEpoch)")
19541955
mapOutputTracker.removeOutputsOnHost(host)
19551956
case None =>
1956-
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
1957+
logInfo(s"Shuffle files lost for executor: $execId (epoch $currentEpoch)")
19571958
mapOutputTracker.removeOutputsOnExecutor(execId)
19581959
}
19591960
clearCacheLocs()
@@ -1986,9 +1987,7 @@ private[spark] class DAGScheduler(
19861987
logInfo("Host added was in lost list earlier: " + host)
19871988
failedEpoch -= execId
19881989
}
1989-
if (fileLostEpoch.contains(execId)) {
1990-
fileLostEpoch -= execId
1991-
}
1990+
fileLostEpoch -= execId
19921991
}
19931992

19941993
private[scheduler] def handleStageCancellation(stageId: Int, reason: Option[String]): Unit = {

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -555,19 +555,15 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
555555
submit(reduceRdd, Array(0, 1, 2))
556556
// Map stage completes successfully,
557557
// two tasks are run on an executor on hostA and one on an executor on hostB
558-
complete(taskSets(0), Seq(
559-
(Success, makeMapStatus("hostA", 3)),
560-
(Success, makeMapStatus("hostA", 3)),
561-
(Success, makeMapStatus("hostB", 3))))
558+
completeShuffleMapStageSuccessfully(0, 0, 3, Seq("hostA", "hostA", "hostB"))
562559
// Now the executor on hostA is lost
563560
runEvent(ExecutorLost("hostA-exec", ExecutorExited(-100, false, "Container marked as failed")))
564561

565562
// The MapOutputTracker has all the shuffle files
566563
val initialMapStatuses = mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses
567-
assert(initialMapStatuses.count(_ != null) == 3)
568-
assert(initialMapStatuses(0).location.executorId === "hostA-exec")
569-
assert(initialMapStatuses(1).location.executorId === "hostA-exec")
570-
assert(initialMapStatuses(2).location.executorId === "hostB-exec")
564+
assert(initialMapStatuses.count(_ != null) === 3)
565+
assert(initialMapStatuses.count(s => s != null && s.location.executorId == "hostA-exec") === 2)
566+
assert(initialMapStatuses.count(s => s != null && s.location.executorId == "hostB-exec") === 1)
571567

572568
// Now a fetch failure from the lost executor occurs
573569
complete(taskSets(1), Seq(
@@ -576,8 +572,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
576572

577573
// Shuffle files for hostA-exec should be lost
578574
val mapStatuses = mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses
579-
assert(mapStatuses.count(_ != null) == 1)
580-
assert(mapStatuses(2).location.executorId === "hostB-exec")
575+
assert(mapStatuses.count(_ != null) === 1)
576+
assert(initialMapStatuses.count(s => s != null && s.location.executorId == "hostA-exec") === 0)
577+
assert(initialMapStatuses.count(s => s != null && s.location.executorId == "hostB-exec") === 1)
581578
}
582579

583580
test("zero split job") {

0 commit comments

Comments
 (0)