Skip to content

Commit a1ca566

Browse files
Ngone51squito
authored andcommitted
[SPARK-23433][SPARK-25250][CORE][BRANCH-2.3] Later created TaskSet should learn about the finished partitions
## What changes were proposed in this pull request? This is an optional solution for #22806 . #21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen. But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why #22806 hit the issue. This pr extends #21131 's behavior by adding stageIdToFinishedPartitions into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into stageIdToFinishedPartitions and won't launch any duplicate tasks. ## How was this patch tested? Add. Closes #24007 from Ngone51/dev-23433-25250-branch-2.3. Authored-by: wuyi <ngone_5451@163.com> Signed-off-by: Imran Rashid <irashid@cloudera.com>
1 parent dfde0c6 commit a1ca566

File tree

3 files changed

+79
-20
lines changed

3 files changed

+79
-20
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
2323
import java.util.concurrent.atomic.AtomicLong
2424

2525
import scala.collection.Set
26-
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
26+
import scala.collection.mutable.{ArrayBuffer, BitSet, HashMap, HashSet}
2727
import scala.util.Random
2828

2929
import org.apache.spark._
@@ -93,6 +93,9 @@ private[spark] class TaskSchedulerImpl(
9393
private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager]
9494
val taskIdToExecutorId = new HashMap[Long, String]
9595

96+
// Protected by `this`
97+
private[scheduler] val stageIdToFinishedPartitions = new HashMap[Int, BitSet]
98+
9699
@volatile private var hasReceivedTask = false
97100
@volatile private var hasLaunchedTask = false
98101
private val starvationTimer = new Timer(true)
@@ -223,7 +226,20 @@ private[spark] class TaskSchedulerImpl(
223226
private[scheduler] def createTaskSetManager(
224227
taskSet: TaskSet,
225228
maxTaskFailures: Int): TaskSetManager = {
226-
new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
229+
// only create a BitSet once for a certain stage since we only remove
230+
// that stage when an active TaskSetManager succeed.
231+
stageIdToFinishedPartitions.getOrElseUpdate(taskSet.stageId, new BitSet)
232+
val tsm = new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
233+
// TaskSet got submitted by DAGScheduler may have some already completed
234+
// tasks since DAGScheduler does not always know all the tasks that have
235+
// been completed by other tasksets when completing a stage, so we mark
236+
// those tasks as finished here to avoid launching duplicate tasks, while
237+
// holding the TaskSchedulerImpl lock.
238+
// See SPARK-25250 and `markPartitionCompletedInAllTaskSets()`
239+
stageIdToFinishedPartitions.get(taskSet.stageId).foreach {
240+
finishedPartitions => finishedPartitions.foreach(tsm.markPartitionCompleted(_, None))
241+
}
242+
tsm
227243
}
228244

229245
override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized {
@@ -696,19 +712,31 @@ private[spark] class TaskSchedulerImpl(
696712
}
697713

698714
/**
699-
* Marks the task has completed in all TaskSetManagers for the given stage.
715+
* Marks the task has completed in all TaskSetManagers(active / zombie) for the given stage.
700716
*
701717
* After stage failure and retry, there may be multiple TaskSetManagers for the stage.
702718
* If an earlier attempt of a stage completes a task, we should ensure that the later attempts
703719
* do not also submit those same tasks. That also means that a task completion from an earlier
704720
* attempt can lead to the entire stage getting marked as successful.
721+
* And there is also the possibility that the DAGScheduler submits another taskset at the same
722+
* time as we're marking a task completed here -- that taskset would have a task for a partition
723+
* that was already completed. We maintain the set of finished partitions in
724+
* stageIdToFinishedPartitions, protected by this, so we can detect those tasks when the taskset
725+
* is submitted. See SPARK-25250 for more details.
726+
*
727+
* note: this method must be called with a lock on this.
705728
*/
706729
private[scheduler] def markPartitionCompletedInAllTaskSets(
707730
stageId: Int,
708731
partitionId: Int,
709732
taskInfo: TaskInfo) = {
733+
// if we do not find a BitSet for this stage, which means an active TaskSetManager
734+
// has already succeeded and removed the stage.
735+
stageIdToFinishedPartitions.get(stageId).foreach{
736+
finishedPartitions => finishedPartitions += partitionId
737+
}
710738
taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm =>
711-
tsm.markPartitionCompleted(partitionId, taskInfo)
739+
tsm.markPartitionCompleted(partitionId, Some(taskInfo))
712740
}
713741
}
714742

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.NotSerializableException
2121
import java.nio.ByteBuffer
2222
import java.util.concurrent.ConcurrentLinkedQueue
2323

24-
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
24+
import scala.collection.mutable.{ArrayBuffer, BitSet, HashMap, HashSet}
2525
import scala.math.max
2626
import scala.util.control.NonFatal
2727

@@ -751,7 +751,11 @@ private[spark] class TaskSetManager(
751751
// Mark successful and stop if all the tasks have succeeded.
752752
successful(index) = true
753753
if (tasksSuccessful == numTasks) {
754-
isZombie = true
754+
// clean up finished partitions for the stage when the active TaskSetManager succeed
755+
if (!isZombie) {
756+
sched.stageIdToFinishedPartitions -= stageId
757+
isZombie = true
758+
}
755759
}
756760
} else {
757761
logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
@@ -770,16 +774,21 @@ private[spark] class TaskSetManager(
770774
maybeFinishTaskSet()
771775
}
772776

773-
private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: TaskInfo): Unit = {
777+
private[scheduler] def markPartitionCompleted(
778+
partitionId: Int,
779+
taskInfo: Option[TaskInfo]): Unit = {
774780
partitionToIndex.get(partitionId).foreach { index =>
775781
if (!successful(index)) {
776782
if (speculationEnabled && !isZombie) {
777-
successfulTaskDurations.insert(taskInfo.duration)
783+
taskInfo.foreach { info => successfulTaskDurations.insert(info.duration) }
778784
}
779785
tasksSuccessful += 1
780786
successful(index) = true
781787
if (tasksSuccessful == numTasks) {
782-
isZombie = true
788+
if (!isZombie) {
789+
sched.stageIdToFinishedPartitions -= stageId
790+
isZombie = true
791+
}
783792
}
784793
maybeFinishTaskSet()
785794
}

core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -929,7 +929,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
929929
}
930930
}
931931

932-
test("Completions in zombie tasksets update status of non-zombie taskset") {
932+
test("SPARK-23433/25250 Completions in zombie tasksets update status of non-zombie taskset") {
933933
val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
934934
val valueSer = SparkEnv.get.serializer.newInstance()
935935

@@ -941,9 +941,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
941941
}
942942

943943
// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
944-
// two times, so we have three active task sets for one stage. (For this to really happen,
945-
// you'd need the previous stage to also get restarted, and then succeed, in between each
946-
// attempt, but that happens outside what we're mocking here.)
944+
// two times, so we have three TaskSetManagers(2 zombie, 1 active) for one stage. (For this
945+
// to really happen, you'd need the previous stage to also get restarted, and then succeed,
946+
// in between each attempt, but that happens outside what we're mocking here.)
947947
val zombieAttempts = (0 until 2).map { stageAttempt =>
948948
val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
949949
taskScheduler.submitTasks(attempt)
@@ -960,30 +960,51 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
960960
assert(tsm.runningTasks === 9)
961961
tsm
962962
}
963+
// we've now got 2 zombie attempts, each with 9 tasks still running. And there's no active
964+
// attempt exists in taskScheduler by now.
965+
966+
// finish partition 1,2 by completing the tasks before a new attempt for the same stage submit.
967+
// This is possible since the behaviour of submitting new attempt and handling successful task
968+
// is from two different threads, which are "task-result-getter" and "dag-scheduler-event-loop"
969+
// separately.
970+
(0 until 2).foreach { i =>
971+
completeTaskSuccessfully(zombieAttempts(i), i + 1)
972+
assert(taskScheduler.stageIdToFinishedPartitions(0).contains(i + 1))
973+
}
963974

964-
// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for
965-
// the stage, but this time with insufficient resources so not all tasks are active.
966-
975+
// Submit the 3rd attempt still with 10 tasks, this happens due to the race between thread
976+
// "task-result-getter" and "dag-scheduler-event-loop", where a TaskSet gets submitted with
977+
// already completed tasks. And this time with insufficient resources so not all tasks are
978+
// active.
967979
val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
968980
taskScheduler.submitTasks(finalAttempt)
969981
val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
982+
// Though finalTSM gets submitted with 10 tasks, the call to taskScheduler.submitTasks should
983+
// realize that 2 tasks have already completed, and mark them appropriately, so it won't launch
984+
// any duplicate tasks later (SPARK-25250).
985+
(0 until 2).map(_ + 1).foreach { partitionId =>
986+
val index = finalTsm.partitionToIndex(partitionId)
987+
assert(finalTsm.successful(index))
988+
}
989+
970990
val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
971991
val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
972992
finalAttempt.tasks(task.index).partitionId
973993
}.toSet
974994
assert(finalTsm.runningTasks === 5)
975995
assert(!finalTsm.isZombie)
976996

977-
// We simulate late completions from our zombie tasksets, corresponding to all the pending
978-
// partitions in our final attempt. This means we're only waiting on the tasks we've already
979-
// launched.
997+
// We continually simulate late completions from our zombie tasksets(but this time, there's one
998+
// active attempt exists in taskScheduler), corresponding to all the pending partitions in our
999+
// final attempt. This means we're only waiting on the tasks we've already launched.
9801000
val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions)
9811001
finalAttemptPendingPartitions.foreach { partition =>
9821002
completeTaskSuccessfully(zombieAttempts(0), partition)
1003+
assert(taskScheduler.stageIdToFinishedPartitions(0).contains(partition))
9831004
}
9841005

9851006
// If there is another resource offer, we shouldn't run anything. Though our final attempt
986-
// used to have pending tasks, now those tasks have been completed by zombie attempts. The
1007+
// used to have pending tasks, now those tasks have been completed by zombie attempts. The
9871008
// remaining tasks to compute are already active in the non-zombie attempt.
9881009
assert(
9891010
taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty)
@@ -1031,5 +1052,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
10311052
// perspective, as the failures weren't from a problem w/ the tasks themselves.
10321053
verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0), meq(stageAttempt), anyObject())
10331054
}
1055+
assert(taskScheduler.stageIdToFinishedPartitions.isEmpty)
10341056
}
10351057
}

0 commit comments

Comments
 (0)