Skip to content

Commit f8351e2

Browse files
committed
DO NOT MERGE
1 parent 554d678 commit f8351e2

File tree

2 files changed

+297
-51
lines changed

2 files changed

+297
-51
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 52 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1554,6 +1554,14 @@ private[spark] class DAGScheduler(
15541554
case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable =>
15551555
mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId)
15561556
sms.shuffleDep.newShuffleMergeState()
1557+
case rs: ResultStage if stage.isIndeterminate &&
1558+
stage.findMissingPartitions().length != rs.partitions.length =>
1559+
stage.makeNewStageAttempt(rs.partitions.length)
1560+
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo,
1561+
Utils.cloneProperties(jobIdToActiveJob(jobId).properties)))
1562+
abortStage(stage, "An indeterminate result stage cannot be reverted", None)
1563+
runningStages -= stage
1564+
return
15571565
case _ =>
15581566
}
15591567

@@ -1939,49 +1947,54 @@ private[spark] class DAGScheduler(
19391947
case rt: ResultTask[_, _] =>
19401948
// Cast to ResultStage here because it's part of the ResultTask
19411949
// TODO Refactor this out to a function that accepts a ResultStage
1942-
val resultStage = stage.asInstanceOf[ResultStage]
1943-
resultStage.activeJob match {
1944-
case Some(job) =>
1945-
if (!job.finished(rt.outputId)) {
1946-
job.finished(rt.outputId) = true
1947-
job.numFinished += 1
1948-
// If the whole job has finished, remove it
1949-
if (job.numFinished == job.numPartitions) {
1950-
markStageAsFinished(resultStage)
1951-
cancelRunningIndependentStages(job, s"Job ${job.jobId} is finished.")
1952-
cleanupStateForJobAndIndependentStages(job)
1950+
if (failedStages.contains(stage) && stage.isIndeterminate) {
1951+
logInfo(log"Ignoring result from ${MDC(RESULT, rt)} because " +
1952+
log"this indeterminate stage failed earlier")
1953+
} else {
1954+
val resultStage = stage.asInstanceOf[ResultStage]
1955+
resultStage.activeJob match {
1956+
case Some(job) =>
1957+
if (!job.finished(rt.outputId)) {
1958+
job.finished(rt.outputId) = true
1959+
job.numFinished += 1
1960+
// If the whole job has finished, remove it
1961+
if (job.numFinished == job.numPartitions) {
1962+
markStageAsFinished(resultStage)
1963+
cancelRunningIndependentStages(job, s"Job ${job.jobId} is finished.")
1964+
cleanupStateForJobAndIndependentStages(job)
1965+
try {
1966+
// killAllTaskAttempts will fail if a SchedulerBackend does not implement
1967+
// killTask.
1968+
logInfo(log"Job ${MDC(JOB_ID, job.jobId)} is finished. Cancelling " +
1969+
log"potential speculative or zombie tasks for this job")
1970+
// ResultStage is only used by this job. It's safe to kill speculative or
1971+
// zombie tasks in this stage.
1972+
taskScheduler.killAllTaskAttempts(
1973+
stageId,
1974+
shouldInterruptTaskThread(job),
1975+
reason = "Stage finished")
1976+
} catch {
1977+
case e: UnsupportedOperationException =>
1978+
logWarning(log"Could not cancel tasks " +
1979+
log"for stage ${MDC(STAGE, stageId)}", e)
1980+
}
1981+
listenerBus.post(
1982+
SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
1983+
}
1984+
1985+
// taskSucceeded runs some user code that might throw an exception. Make sure
1986+
// we are resilient against that.
19531987
try {
1954-
// killAllTaskAttempts will fail if a SchedulerBackend does not implement
1955-
// killTask.
1956-
logInfo(log"Job ${MDC(JOB_ID, job.jobId)} is finished. Cancelling " +
1957-
log"potential speculative or zombie tasks for this job")
1958-
// ResultStage is only used by this job. It's safe to kill speculative or
1959-
// zombie tasks in this stage.
1960-
taskScheduler.killAllTaskAttempts(
1961-
stageId,
1962-
shouldInterruptTaskThread(job),
1963-
reason = "Stage finished")
1988+
job.listener.taskSucceeded(rt.outputId, event.result)
19641989
} catch {
1965-
case e: UnsupportedOperationException =>
1966-
logWarning(log"Could not cancel tasks " +
1967-
log"for stage ${MDC(STAGE, stageId)}", e)
1990+
case e: Throwable if !Utils.isFatalError(e) =>
1991+
// TODO: Perhaps we want to mark the resultStage as failed?
1992+
job.listener.jobFailed(new SparkDriverExecutionException(e))
19681993
}
1969-
listenerBus.post(
1970-
SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
19711994
}
1972-
1973-
// taskSucceeded runs some user code that might throw an exception. Make sure
1974-
// we are resilient against that.
1975-
try {
1976-
job.listener.taskSucceeded(rt.outputId, event.result)
1977-
} catch {
1978-
case e: Throwable if !Utils.isFatalError(e) =>
1979-
// TODO: Perhaps we want to mark the resultStage as failed?
1980-
job.listener.jobFailed(new SparkDriverExecutionException(e))
1981-
}
1982-
}
1983-
case None =>
1984-
logInfo(log"Ignoring result from ${MDC(RESULT, rt)} because its job has finished")
1995+
case None =>
1996+
logInfo(log"Ignoring result from ${MDC(RESULT, rt)} because its job has finished")
1997+
}
19851998
}
19861999

19872000
case smt: ShuffleMapTask =>

0 commit comments

Comments
 (0)