Skip to content

Commit db86ccb

Browse files
Ngone51squito
authored andcommitted
[SPARK-23433][SPARK-25250][CORE] Later created TaskSet should learn about the finished partitions
## What changes were proposed in this pull request? This is an optional solution for #22806 . #21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen. But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why #22806 hit the issue. This pr extends #21131 's behavior by adding `stageIdToFinishedPartitions` into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into `stageIdToFinishedPartitions ` and won't launch any duplicate tasks. ## How was this patch tested? Add. Closes #23871 from Ngone51/dev-23433-25250. Lead-authored-by: wuyi <ngone_5451@163.com> Co-authored-by: Ngone51 <ngone_5451@163.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit e5c6143) Signed-off-by: Imran Rashid <irashid@cloudera.com>
1 parent 5ec4563 commit db86ccb

File tree

3 files changed

+79
-20
lines changed

3 files changed

+79
-20
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
2323
import java.util.concurrent.atomic.AtomicLong
2424

2525
import scala.collection.Set
26-
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
26+
import scala.collection.mutable.{ArrayBuffer, BitSet, HashMap, HashSet}
2727
import scala.util.Random
2828

2929
import org.apache.spark._
@@ -94,6 +94,9 @@ private[spark] class TaskSchedulerImpl(
9494
private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager]
9595
val taskIdToExecutorId = new HashMap[Long, String]
9696

97+
// Protected by `this`
98+
private[scheduler] val stageIdToFinishedPartitions = new HashMap[Int, BitSet]
99+
97100
@volatile private var hasReceivedTask = false
98101
@volatile private var hasLaunchedTask = false
99102
private val starvationTimer = new Timer(true)
@@ -236,7 +239,20 @@ private[spark] class TaskSchedulerImpl(
236239
private[scheduler] def createTaskSetManager(
237240
taskSet: TaskSet,
238241
maxTaskFailures: Int): TaskSetManager = {
239-
new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
242+
// only create a BitSet once for a certain stage since we only remove
243+
// that stage when an active TaskSetManager succeed.
244+
stageIdToFinishedPartitions.getOrElseUpdate(taskSet.stageId, new BitSet)
245+
val tsm = new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
246+
// TaskSet got submitted by DAGScheduler may have some already completed
247+
// tasks since DAGScheduler does not always know all the tasks that have
248+
// been completed by other tasksets when completing a stage, so we mark
249+
// those tasks as finished here to avoid launching duplicate tasks, while
250+
// holding the TaskSchedulerImpl lock.
251+
// See SPARK-25250 and `markPartitionCompletedInAllTaskSets()`
252+
stageIdToFinishedPartitions.get(taskSet.stageId).foreach {
253+
finishedPartitions => finishedPartitions.foreach(tsm.markPartitionCompleted(_, None))
254+
}
255+
tsm
240256
}
241257

242258
override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized {
@@ -833,19 +849,31 @@ private[spark] class TaskSchedulerImpl(
833849
}
834850

835851
/**
836-
* Marks the task has completed in all TaskSetManagers for the given stage.
852+
* Marks the task has completed in all TaskSetManagers(active / zombie) for the given stage.
837853
*
838854
* After stage failure and retry, there may be multiple TaskSetManagers for the stage.
839855
* If an earlier attempt of a stage completes a task, we should ensure that the later attempts
840856
* do not also submit those same tasks. That also means that a task completion from an earlier
841857
* attempt can lead to the entire stage getting marked as successful.
858+
* And there is also the possibility that the DAGScheduler submits another taskset at the same
859+
* time as we're marking a task completed here -- that taskset would have a task for a partition
860+
* that was already completed. We maintain the set of finished partitions in
861+
* stageIdToFinishedPartitions, protected by this, so we can detect those tasks when the taskset
862+
* is submitted. See SPARK-25250 for more details.
863+
*
864+
* note: this method must be called with a lock on this.
842865
*/
843866
private[scheduler] def markPartitionCompletedInAllTaskSets(
844867
stageId: Int,
845868
partitionId: Int,
846869
taskInfo: TaskInfo) = {
870+
// if we do not find a BitSet for this stage, which means an active TaskSetManager
871+
// has already succeeded and removed the stage.
872+
stageIdToFinishedPartitions.get(stageId).foreach{
873+
finishedPartitions => finishedPartitions += partitionId
874+
}
847875
taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm =>
848-
tsm.markPartitionCompleted(partitionId, taskInfo)
876+
tsm.markPartitionCompleted(partitionId, Some(taskInfo))
849877
}
850878
}
851879

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.NotSerializableException
2121
import java.nio.ByteBuffer
2222
import java.util.concurrent.ConcurrentLinkedQueue
2323

24-
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
24+
import scala.collection.mutable.{ArrayBuffer, BitSet, HashMap, HashSet}
2525
import scala.math.max
2626
import scala.util.control.NonFatal
2727

@@ -777,7 +777,11 @@ private[spark] class TaskSetManager(
777777
// Mark successful and stop if all the tasks have succeeded.
778778
successful(index) = true
779779
if (tasksSuccessful == numTasks) {
780-
isZombie = true
780+
// clean up finished partitions for the stage when the active TaskSetManager succeed
781+
if (!isZombie) {
782+
sched.stageIdToFinishedPartitions -= stageId
783+
isZombie = true
784+
}
781785
}
782786
} else {
783787
logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
@@ -796,16 +800,21 @@ private[spark] class TaskSetManager(
796800
maybeFinishTaskSet()
797801
}
798802

799-
private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: TaskInfo): Unit = {
803+
private[scheduler] def markPartitionCompleted(
804+
partitionId: Int,
805+
taskInfo: Option[TaskInfo]): Unit = {
800806
partitionToIndex.get(partitionId).foreach { index =>
801807
if (!successful(index)) {
802808
if (speculationEnabled && !isZombie) {
803-
successfulTaskDurations.insert(taskInfo.duration)
809+
taskInfo.foreach { info => successfulTaskDurations.insert(info.duration) }
804810
}
805811
tasksSuccessful += 1
806812
successful(index) = true
807813
if (tasksSuccessful == numTasks) {
808-
isZombie = true
814+
if (!isZombie) {
815+
sched.stageIdToFinishedPartitions -= stageId
816+
isZombie = true
817+
}
809818
}
810819
maybeFinishTaskSet()
811820
}

core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,7 +1102,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
11021102
}
11031103
}
11041104

1105-
test("Completions in zombie tasksets update status of non-zombie taskset") {
1105+
test("SPARK-23433/25250 Completions in zombie tasksets update status of non-zombie taskset") {
11061106
val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
11071107
val valueSer = SparkEnv.get.serializer.newInstance()
11081108

@@ -1114,9 +1114,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
11141114
}
11151115

11161116
// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
1117-
// two times, so we have three active task sets for one stage. (For this to really happen,
1118-
// you'd need the previous stage to also get restarted, and then succeed, in between each
1119-
// attempt, but that happens outside what we're mocking here.)
1117+
// two times, so we have three TaskSetManagers(2 zombie, 1 active) for one stage. (For this
1118+
// to really happen, you'd need the previous stage to also get restarted, and then succeed,
1119+
// in between each attempt, but that happens outside what we're mocking here.)
11201120
val zombieAttempts = (0 until 2).map { stageAttempt =>
11211121
val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
11221122
taskScheduler.submitTasks(attempt)
@@ -1133,30 +1133,51 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
11331133
assert(tsm.runningTasks === 9)
11341134
tsm
11351135
}
1136+
// we've now got 2 zombie attempts, each with 9 tasks still running. And there's no active
1137+
// attempt exists in taskScheduler by now.
1138+
1139+
// finish partition 1,2 by completing the tasks before a new attempt for the same stage submit.
1140+
// This is possible since the behaviour of submitting new attempt and handling successful task
1141+
// is from two different threads, which are "task-result-getter" and "dag-scheduler-event-loop"
1142+
// separately.
1143+
(0 until 2).foreach { i =>
1144+
completeTaskSuccessfully(zombieAttempts(i), i + 1)
1145+
assert(taskScheduler.stageIdToFinishedPartitions(0).contains(i + 1))
1146+
}
11361147

1137-
// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for
1138-
// the stage, but this time with insufficient resources so not all tasks are active.
1139-
1148+
// Submit the 3rd attempt still with 10 tasks, this happens due to the race between thread
1149+
// "task-result-getter" and "dag-scheduler-event-loop", where a TaskSet gets submitted with
1150+
// already completed tasks. And this time with insufficient resources so not all tasks are
1151+
// active.
11401152
val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
11411153
taskScheduler.submitTasks(finalAttempt)
11421154
val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
1155+
// Though finalTSM gets submitted with 10 tasks, the call to taskScheduler.submitTasks should
1156+
// realize that 2 tasks have already completed, and mark them appropriately, so it won't launch
1157+
// any duplicate tasks later (SPARK-25250).
1158+
(0 until 2).map(_ + 1).foreach { partitionId =>
1159+
val index = finalTsm.partitionToIndex(partitionId)
1160+
assert(finalTsm.successful(index))
1161+
}
1162+
11431163
val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
11441164
val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
11451165
finalAttempt.tasks(task.index).partitionId
11461166
}.toSet
11471167
assert(finalTsm.runningTasks === 5)
11481168
assert(!finalTsm.isZombie)
11491169

1150-
// We simulate late completions from our zombie tasksets, corresponding to all the pending
1151-
// partitions in our final attempt. This means we're only waiting on the tasks we've already
1152-
// launched.
1170+
// We continually simulate late completions from our zombie tasksets(but this time, there's one
1171+
// active attempt exists in taskScheduler), corresponding to all the pending partitions in our
1172+
// final attempt. This means we're only waiting on the tasks we've already launched.
11531173
val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions)
11541174
finalAttemptPendingPartitions.foreach { partition =>
11551175
completeTaskSuccessfully(zombieAttempts(0), partition)
1176+
assert(taskScheduler.stageIdToFinishedPartitions(0).contains(partition))
11561177
}
11571178

11581179
// If there is another resource offer, we shouldn't run anything. Though our final attempt
1159-
// used to have pending tasks, now those tasks have been completed by zombie attempts. The
1180+
// used to have pending tasks, now those tasks have been completed by zombie attempts. The
11601181
// remaining tasks to compute are already active in the non-zombie attempt.
11611182
assert(
11621183
taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty)
@@ -1204,6 +1225,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
12041225
// perspective, as the failures weren't from a problem w/ the tasks themselves.
12051226
verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0), meq(stageAttempt), anyObject())
12061227
}
1228+
assert(taskScheduler.stageIdToFinishedPartitions.isEmpty)
12071229
}
12081230

12091231
test("don't schedule for a barrier taskSet if available slots are less than pending tasks") {

0 commit comments

Comments
 (0)