Skip to content

Commit 4e976ab

Browse files
committed
[SPARK-32003][CORE] Unregister outputs for executor on fetch failure after executor is lost
If an executor is lost, the `DAGScheduler` handles the executor loss by removing the executor but does not unregister its outputs if the external shuffle service is used. However, if the node on which the executor runs is lost, the shuffle service may not be able to serve the shuffle files. In such a case, when fetches from the executor's outputs fail in the same stage, the `DAGScheduler` again removes the executor and by right, should unregister its outputs. It doesn't because the epoch used to track the executor failure has not increased. We track the epoch for failed executors that result in lost file output separately, so we can unregister the outputs in this scenario.
1 parent 7f6a8ab commit 4e976ab

File tree

2 files changed

+56
-13
lines changed

2 files changed

+56
-13
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,8 @@ private[spark] class DAGScheduler(
177177
// TODO: Garbage collect information about failure epochs when we know there are no more
178178
// stray messages to detect.
179179
private val failedEpoch = new HashMap[String, Long]
180+
// In addition, track epoch for failed executors that result in lost file output
181+
private val fileLostEpoch = new HashMap[String, Long]
180182

181183
private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator
182184

@@ -1939,24 +1941,22 @@ private[spark] class DAGScheduler(
19391941
hostToUnregisterOutputs: Option[String],
19401942
maybeEpoch: Option[Long] = None): Unit = {
19411943
val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
1944+
logDebug(s"Removing executor $execId, fileLost: $fileLost, currentEpoch: $currentEpoch")
19421945
if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
19431946
failedEpoch(execId) = currentEpoch
19441947
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
19451948
blockManagerMaster.removeExecutor(execId)
1946-
if (fileLost) {
1947-
hostToUnregisterOutputs match {
1948-
case Some(host) =>
1949-
logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch))
1950-
mapOutputTracker.removeOutputsOnHost(host)
1951-
case None =>
1952-
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
1953-
mapOutputTracker.removeOutputsOnExecutor(execId)
1954-
}
1955-
clearCacheLocs()
1956-
1957-
} else {
1958-
logDebug("Additional executor lost message for %s (epoch %d)".format(execId, currentEpoch))
1949+
}
1950+
if (fileLost && (!fileLostEpoch.contains(execId) || fileLostEpoch(execId) < currentEpoch)) {
1951+
hostToUnregisterOutputs match {
1952+
case Some(host) =>
1953+
logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch))
1954+
mapOutputTracker.removeOutputsOnHost(host)
1955+
case None =>
1956+
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
1957+
mapOutputTracker.removeOutputsOnExecutor(execId)
19591958
}
1959+
clearCacheLocs()
19601960
}
19611961
}
19621962

@@ -1986,6 +1986,9 @@ private[spark] class DAGScheduler(
19861986
logInfo("Host added was in lost list earlier: " + host)
19871987
failedEpoch -= execId
19881988
}
1989+
if (fileLostEpoch.contains(execId)) {
1990+
fileLostEpoch -= execId
1991+
}
19891992
}
19901993

19911994
private[scheduler] def handleStageCancellation(stageId: Int, reason: Option[String]): Unit = {

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,46 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
540540
assert(mapStatus2(2).location.host === "hostB")
541541
}
542542

543+
test("[SPARK-32003] All shuffle files for executor should be cleaned up on fetch failure") {
544+
// reset the test context with the right shuffle service config
545+
afterEach()
546+
val conf = new SparkConf()
547+
conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
548+
init(conf)
549+
550+
val shuffleMapRdd = new MyRDD(sc, 3, Nil)
551+
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3))
552+
val shuffleId = shuffleDep.shuffleId
553+
val reduceRdd = new MyRDD(sc, 3, List(shuffleDep), tracker = mapOutputTracker)
554+
555+
submit(reduceRdd, Array(0, 1, 2))
556+
// Map stage completes successfully,
557+
// two tasks are run on an executor on hostA and one on an executor on hostB
558+
complete(taskSets(0), Seq(
559+
(Success, makeMapStatus("hostA", 3)),
560+
(Success, makeMapStatus("hostA", 3)),
561+
(Success, makeMapStatus("hostB", 3))))
562+
// Now the executor on hostA is lost
563+
runEvent(ExecutorLost("hostA-exec", ExecutorExited(-100, false, "Container marked as failed")))
564+
565+
// The MapOutputTracker has all the shuffle files
566+
val initialMapStatuses = mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses
567+
assert(initialMapStatuses.count(_ != null) == 3)
568+
assert(initialMapStatuses(0).location.executorId === "hostA-exec")
569+
assert(initialMapStatuses(1).location.executorId === "hostA-exec")
570+
assert(initialMapStatuses(2).location.executorId === "hostB-exec")
571+
572+
// Now a fetch failure from the lost executor occurs
573+
complete(taskSets(1), Seq(
574+
(FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 0, 0, "ignored"), null)
575+
))
576+
577+
// Shuffle files for hostA-exec should be lost
578+
val mapStatuses = mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses
579+
assert(mapStatuses.count(_ != null) == 1)
580+
assert(mapStatuses(2).location.executorId === "hostB-exec")
581+
}
582+
543583
test("zero split job") {
544584
var numResults = 0
545585
var failureReason: Option[Exception] = None

0 commit comments

Comments
 (0)