Skip to content

Commit 299d992

Browse files
cloud-fanvenkata91
authored andcommitted
[SPARK-27474][CORE] avoid retrying a task failed with CommitDeniedException many times
Ref: LIHADOOP-53705 https://issues.apache.org/jira/browse/SPARK-25250 reports a bug that, a task which is failed with `CommitDeniedException` gets retried many times. This can happen when a stage has 2 task set managers, one is zombie, one is active. A task from the zombie TSM completes, and commits to a central coordinator(assuming it's a file writing task). Then the corresponding task from the active TSM will fail with `CommitDeniedException`. `CommitDeniedException.countTowardsTaskFailures` is false, so the active TSM will keep retrying this task, until the job finishes. This wastes resource a lot. However, apache#23871 has a bug and was reverted in apache#24359. With hindsight, apache#23781 is fragile because we need to sync the states between `DAGScheduler` and `TaskScheduler`, about which partitions are completed. This PR proposes a new fix: 1. When `DAGScheduler` gets a task success event from an earlier attempt, notify the `TaskSchedulerImpl` about it 2. When `TaskSchedulerImpl` knows a partition is already completed, ask the active `TaskSetManager` to mark the corresponding task as finished, if the task is not finished yet. This fix covers the corner case, because: 1. If `DAGScheduler` gets the task completion event from zombie TSM before submitting the new stage attempt, then `DAGScheduler` knows that this partition is completed, and it will exclude this partition when creating task set for the new stage attempt. See `DAGScheduler.submitMissingTasks` 2. If `DAGScheduler` gets the task completion event from zombie TSM after submitting the new stage attempt, then the active TSM is already created. Compared to the previous fix, the message loop becomes longer, so it's likely that, the active task set manager has already retried the task multiple times. But this failure window won't be too big, and we want to avoid the worse case that retries the task many times until the job finishes. So this solution is acceptable. a new test case. Closes apache#24375 from cloud-fan/fix2. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> RB=2113301 BUG=LIHADOOP-53705 G=spark-reviewers R=chsingh A=chsingh
1 parent 9465025 commit 299d992

File tree

9 files changed

+130
-159
lines changed

9 files changed

+130
-159
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1415,6 +1415,15 @@ class DAGScheduler(
14151415

14161416
event.reason match {
14171417
case Success =>
1418+
// An earlier attempt of a stage (which is zombie) may still have running tasks. If these
1419+
// tasks complete, they still count and we can mark the corresponding partitions as
1420+
// finished. Here we notify the task scheduler to skip running tasks for the same partition,
1421+
// to save resource.
1422+
if (task.stageAttemptId < stage.latestInfo.attemptNumber()) {
1423+
taskScheduler.notifyPartitionCompletion(
1424+
stageId, task.partitionId, event.taskInfo.duration)
1425+
}
1426+
14181427
task match {
14191428
case rt: ResultTask[_, _] =>
14201429
// Cast to ResultStage here because it's part of the ResultTask

core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,18 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
157157
}
158158
}
159159

160+
// This method calls `TaskSchedulerImpl.handlePartitionCompleted` asynchronously. We do not want
161+
// DAGScheduler to call `TaskSchedulerImpl.handlePartitionCompleted` directly, as it's
162+
// synchronized and may hurt the throughput of the scheduler.
163+
def enqueuePartitionCompletionNotification(
164+
stageId: Int, partitionId: Int, taskDuration: Long): Unit = {
165+
getTaskResultExecutor.execute(new Runnable {
166+
override def run(): Unit = Utils.logUncaughtExceptions {
167+
scheduler.handlePartitionCompleted(stageId, partitionId, taskDuration)
168+
}
169+
})
170+
}
171+
160172
def stop() {
161173
getTaskResultExecutor.shutdownNow()
162174
}

core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ private[spark] trait TaskScheduler {
6161
*/
6262
def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean
6363

64+
// Notify the corresponding `TaskSetManager`s of the stage, that a partition has already completed
65+
// and they can skip running tasks for it.
66+
def notifyPartitionCompletion(stageId: Int, partitionId: Int, taskDuration: Long)
67+
6468
// Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.
6569
def setDAGScheduler(dagScheduler: DAGScheduler): Unit
6670

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,11 @@ private[spark] class TaskSchedulerImpl(
257257
}
258258
}
259259

260+
override def notifyPartitionCompletion(
261+
stageId: Int, partitionId: Int, taskDuration: Long): Unit = {
262+
taskResultGetter.enqueuePartitionCompletionNotification(stageId, partitionId, taskDuration)
263+
}
264+
260265
/**
261266
* Called to indicate that all task attempts (including speculated tasks) associated with the
262267
* given TaskSetManager have completed, so state associated with the TaskSetManager should be
@@ -545,6 +550,24 @@ private[spark] class TaskSchedulerImpl(
545550
}
546551
}
547552

553+
/**
554+
* Marks the task has completed in the active TaskSetManager for the given stage.
555+
*
556+
* After stage failure and retry, there may be multiple TaskSetManagers for the stage.
557+
* If an earlier zombie attempt of a stage completes a task, we can ask the later active attempt
558+
* to skip submitting and running the task for the same partition, to save resource. That also
559+
* means that a task completion from an earlier zombie attempt can lead to the entire stage
560+
* getting marked as successful.
561+
*/
562+
private[scheduler] def handlePartitionCompleted(
563+
stageId: Int,
564+
partitionId: Int,
565+
taskDuration: Long) = synchronized {
566+
taskSetsByStageIdAndAttempt.get(stageId).foreach(_.values.filter(!_.isZombie).foreach { tsm =>
567+
tsm.markPartitionCompleted(partitionId, taskDuration)
568+
})
569+
}
570+
548571
def error(message: String) {
549572
synchronized {
550573
if (taskSetsByStageIdAndAttempt.nonEmpty) {
@@ -755,24 +778,6 @@ private[spark] class TaskSchedulerImpl(
755778
manager
756779
}
757780
}
758-
759-
/**
760-
* Marks the task has completed in all TaskSetManagers for the given stage.
761-
*
762-
* After stage failure and retry, there may be multiple TaskSetManagers for the stage.
763-
* If an earlier attempt of a stage completes a task, we should ensure that the later attempts
764-
* do not also submit those same tasks. That also means that a task completion from an earlier
765-
* attempt can lead to the entire stage getting marked as successful.
766-
*/
767-
private[scheduler] def markPartitionCompletedInAllTaskSets(
768-
stageId: Int,
769-
partitionId: Int,
770-
taskInfo: TaskInfo) = {
771-
taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm =>
772-
tsm.markPartitionCompleted(partitionId, taskInfo)
773-
}
774-
}
775-
776781
}
777782

778783

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -677,9 +677,6 @@ private[spark] class TaskSetManager(
677677
logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
678678
" because task " + index + " has already completed successfully")
679679
}
680-
// There may be multiple tasksets for this stage -- we let all of them know that the partition
681-
// was completed. This may result in some of the tasksets getting completed.
682-
sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId, info)
683680
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
684681
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
685682
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
@@ -690,11 +687,11 @@ private[spark] class TaskSetManager(
690687
maybeFinishTaskSet()
691688
}
692689

693-
private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: TaskInfo): Unit = {
690+
private[scheduler] def markPartitionCompleted(partitionId: Int, taskDuration: Long): Unit = {
694691
partitionToIndex.get(partitionId).foreach { index =>
695692
if (!successful(index)) {
696693
if (speculationEnabled && !isZombie) {
697-
successfulTaskDurations.insert(taskInfo.duration)
694+
successfulTaskDurations.insert(taskDuration)
698695
}
699696
tasksSuccessful += 1
700697
successful(index) = true

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
113113
/** Stages for which the DAGScheduler has called TaskScheduler.cancelTasks(). */
114114
val cancelledStages = new HashSet[Int]()
115115

116+
val tasksMarkedAsCompleted = new ArrayBuffer[Task[_]]()
117+
116118
val taskScheduler = new TaskScheduler() {
117119
override def schedulingMode: SchedulingMode = SchedulingMode.FIFO
118120
override def rootPool: Pool = new Pool("", schedulingMode, 0, 0)
@@ -133,6 +135,16 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
133135
}
134136
override def killTaskAttempt(
135137
taskId: Long, interruptThread: Boolean, reason: String): Boolean = false
138+
139+
override def notifyPartitionCompletion(
140+
stageId: Int, partitionId: Int, taskDuration: Long): Unit = {
141+
taskSets.filter(_.stageId == stageId).lastOption.foreach { ts =>
142+
val tasks = ts.tasks.filter(_.partitionId == partitionId)
143+
assert(tasks.length == 1)
144+
tasksMarkedAsCompleted += tasks.head
145+
}
146+
}
147+
136148
override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
137149
override def defaultParallelism() = 2
138150
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
@@ -225,6 +237,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
225237
failure = null
226238
sc.addSparkListener(sparkListener)
227239
taskSets.clear()
240+
tasksMarkedAsCompleted.clear()
228241
cancelledStages.clear()
229242
cacheLocations.clear()
230243
results.clear()
@@ -641,6 +654,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
641654
taskId: Long, interruptThread: Boolean, reason: String): Boolean = {
642655
throw new UnsupportedOperationException
643656
}
657+
override def notifyPartitionCompletion(
658+
stageId: Int, partitionId: Int, taskDuration: Long): Unit = {
659+
throw new UnsupportedOperationException
660+
}
644661
override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
645662
override def defaultParallelism(): Int = 2
646663
override def executorHeartbeatReceived(
@@ -2459,6 +2476,57 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
24592476
assertDataStructuresEmpty()
24602477
}
24612478

2479+
test("Completions in zombie tasksets update status of non-zombie taskset") {
2480+
val parts = 4
2481+
val shuffleMapRdd = new MyRDD(sc, parts, Nil)
2482+
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts))
2483+
val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker)
2484+
submit(reduceRdd, (0 until parts).toArray)
2485+
assert(taskSets.length == 1)
2486+
2487+
// Finish the first task of the shuffle map stage.
2488+
runEvent(makeCompletionEvent(
2489+
taskSets(0).tasks(0), Success, makeMapStatus("hostA", 4),
2490+
Seq.empty, createFakeTaskInfoWithId(0)))
2491+
2492+
// The second task of the shuffle map stage failed with FetchFailed.
2493+
runEvent(makeCompletionEvent(
2494+
taskSets(0).tasks(1),
2495+
FetchFailed(makeBlockManagerId("hostB"), shuffleDep.shuffleId, 0, 0, "ignored"),
2496+
null))
2497+
2498+
scheduler.resubmitFailedStages()
2499+
assert(taskSets.length == 2)
2500+
// The first partition has completed already, so the new attempt only need to run 3 tasks.
2501+
assert(taskSets(1).tasks.length == 3)
2502+
2503+
// Finish the first task of the second attempt of the shuffle map stage.
2504+
runEvent(makeCompletionEvent(
2505+
taskSets(1).tasks(0), Success, makeMapStatus("hostA", 4),
2506+
Seq.empty, createFakeTaskInfoWithId(0)))
2507+
2508+
// Finish the third task of the first attempt of the shuffle map stage.
2509+
runEvent(makeCompletionEvent(
2510+
taskSets(0).tasks(2), Success, makeMapStatus("hostA", 4),
2511+
Seq.empty, createFakeTaskInfoWithId(0)))
2512+
assert(tasksMarkedAsCompleted.length == 1)
2513+
assert(tasksMarkedAsCompleted.head.partitionId == 2)
2514+
2515+
// Finish the forth task of the first attempt of the shuffle map stage.
2516+
runEvent(makeCompletionEvent(
2517+
taskSets(0).tasks(3), Success, makeMapStatus("hostA", 4),
2518+
Seq.empty, createFakeTaskInfoWithId(0)))
2519+
assert(tasksMarkedAsCompleted.length == 2)
2520+
assert(tasksMarkedAsCompleted.last.partitionId == 3)
2521+
2522+
// Now the shuffle map stage is completed, and the next stage is submitted.
2523+
assert(taskSets.length == 3)
2524+
2525+
// Finish
2526+
complete(taskSets(2), Seq((Success, 42), (Success, 42), (Success, 42), (Success, 42)))
2527+
assertDataStructuresEmpty()
2528+
}
2529+
24622530
/**
24632531
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
24642532
* Note that this checks only the host and not the executor ID.

core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ private class DummyTaskScheduler extends TaskScheduler {
8383
override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = {}
8484
override def killTaskAttempt(
8585
taskId: Long, interruptThread: Boolean, reason: String): Boolean = false
86+
override def notifyPartitionCompletion(
87+
stageId: Int, partitionId: Int, taskDuration: Long): Unit = {}
8688
override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
8789
override def defaultParallelism(): Int = 2
8890
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}

core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala

Lines changed: 0 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -1100,108 +1100,4 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
11001100
taskScheduler.initialize(new FakeSchedulerBackend)
11011101
}
11021102
}
1103-
1104-
test("Completions in zombie tasksets update status of non-zombie taskset") {
1105-
val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
1106-
val valueSer = SparkEnv.get.serializer.newInstance()
1107-
1108-
def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = {
1109-
val indexInTsm = tsm.partitionToIndex(partition)
1110-
val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head
1111-
val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
1112-
tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
1113-
}
1114-
1115-
// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
1116-
// two times, so we have three active task sets for one stage. (For this to really happen,
1117-
// you'd need the previous stage to also get restarted, and then succeed, in between each
1118-
// attempt, but that happens outside what we're mocking here.)
1119-
val zombieAttempts = (0 until 2).map { stageAttempt =>
1120-
val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
1121-
taskScheduler.submitTasks(attempt)
1122-
val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
1123-
val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
1124-
taskScheduler.resourceOffers(offers)
1125-
assert(tsm.runningTasks === 10)
1126-
// fail attempt
1127-
tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED,
1128-
FetchFailed(null, 0, 0, 0, "fetch failed"))
1129-
// the attempt is a zombie, but the tasks are still running (this could be true even if
1130-
// we actively killed those tasks, as killing is best-effort)
1131-
assert(tsm.isZombie)
1132-
assert(tsm.runningTasks === 9)
1133-
tsm
1134-
}
1135-
1136-
// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for
1137-
// the stage, but this time with insufficient resources so not all tasks are active.
1138-
1139-
val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
1140-
taskScheduler.submitTasks(finalAttempt)
1141-
val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
1142-
val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
1143-
val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
1144-
finalAttempt.tasks(task.index).partitionId
1145-
}.toSet
1146-
assert(finalTsm.runningTasks === 5)
1147-
assert(!finalTsm.isZombie)
1148-
1149-
// We simulate late completions from our zombie tasksets, corresponding to all the pending
1150-
// partitions in our final attempt. This means we're only waiting on the tasks we've already
1151-
// launched.
1152-
val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions)
1153-
finalAttemptPendingPartitions.foreach { partition =>
1154-
completeTaskSuccessfully(zombieAttempts(0), partition)
1155-
}
1156-
1157-
// If there is another resource offer, we shouldn't run anything. Though our final attempt
1158-
// used to have pending tasks, now those tasks have been completed by zombie attempts. The
1159-
// remaining tasks to compute are already active in the non-zombie attempt.
1160-
assert(
1161-
taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty)
1162-
1163-
val remainingTasks = finalAttemptLaunchedPartitions.toIndexedSeq.sorted
1164-
1165-
// finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be
1166-
// marked as zombie.
1167-
// for each of the remaining tasks, find the tasksets with an active copy of the task, and
1168-
// finish the task.
1169-
remainingTasks.foreach { partition =>
1170-
val tsm = if (partition == 0) {
1171-
// we failed this task on both zombie attempts, this one is only present in the latest
1172-
// taskset
1173-
finalTsm
1174-
} else {
1175-
// should be active in every taskset. We choose a zombie taskset just to make sure that
1176-
// we transition the active taskset correctly even if the final completion comes
1177-
// from a zombie.
1178-
zombieAttempts(partition % 2)
1179-
}
1180-
completeTaskSuccessfully(tsm, partition)
1181-
}
1182-
1183-
assert(finalTsm.isZombie)
1184-
1185-
// no taskset has completed all of its tasks, so no updates to the blacklist tracker yet
1186-
verify(blacklist, never).updateBlacklistForSuccessfulTaskSet(anyInt(), anyInt(), anyObject())
1187-
1188-
// finally, lets complete all the tasks. We simulate failures in attempt 1, but everything
1189-
// else succeeds, to make sure we get the right updates to the blacklist in all cases.
1190-
(zombieAttempts ++ Seq(finalTsm)).foreach { tsm =>
1191-
val stageAttempt = tsm.taskSet.stageAttemptId
1192-
tsm.runningTasksSet.foreach { index =>
1193-
if (stageAttempt == 1) {
1194-
tsm.handleFailedTask(tsm.taskInfos(index).taskId, TaskState.FAILED, TaskResultLost)
1195-
} else {
1196-
val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
1197-
tsm.handleSuccessfulTask(tsm.taskInfos(index).taskId, result)
1198-
}
1199-
}
1200-
1201-
// we update the blacklist for the stage attempts with all successful tasks. Even though
1202-
// some tasksets had failures, we still consider them all successful from a blacklisting
1203-
// perspective, as the failures weren't from a problem w/ the tasks themselves.
1204-
verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0), meq(stageAttempt), anyObject())
1205-
}
1206-
}
12071103
}

0 commit comments

Comments
 (0)