Skip to content

Commit ee1f5df

Browse files
committed
address comment
1 parent f132194 commit ee1f5df

File tree

3 files changed

+63
-98
lines changed

3 files changed

+63
-98
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
2323
import java.util.concurrent.atomic.AtomicLong
2424

2525
import scala.collection.Set
26-
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
26+
import scala.collection.mutable.{ArrayBuffer, BitSet, HashMap, HashSet}
2727
import scala.util.Random
2828

2929
import org.apache.spark._
@@ -97,7 +97,7 @@ private[spark] class TaskSchedulerImpl(
9797
val taskIdToExecutorId = new HashMap[Long, String]
9898

9999
// Protected by `this`
100-
private[scheduler] val stageIdToFinishedPartitions = new HashMap[Int, HashSet[Int]]
100+
private[scheduler] val stageIdToFinishedPartitions = new HashMap[Int, BitSet]
101101

102102
@volatile private var hasReceivedTask = false
103103
@volatile private var hasLaunchedTask = false
@@ -211,7 +211,7 @@ private[spark] class TaskSchedulerImpl(
211211
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
212212
stageTaskSets(taskSet.stageAttemptId) = manager
213213
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
214-
ts.taskSet != manager.taskSet && !ts.isZombie
214+
ts.taskSet != taskSet && !ts.isZombie
215215
}
216216
if (conflictingTaskSet) {
217217
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
@@ -241,13 +241,7 @@ private[spark] class TaskSchedulerImpl(
241241
private[scheduler] def createTaskSetManager(
242242
taskSet: TaskSet,
243243
maxTaskFailures: Int): TaskSetManager = {
244-
val finishedPartitions =
245-
stageIdToFinishedPartitions.getOrElseUpdate(taskSet.stageId, new HashSet[Int])
246-
// filter the task which has been finished by previous attempts
247-
val tasks = taskSet.tasks.filterNot{ t => finishedPartitions(t.partitionId) }
248-
val ts = new TaskSet(
249-
tasks, taskSet.stageId, taskSet.stageAttemptId, taskSet.priority, taskSet.properties)
250-
new TaskSetManager(this, ts, maxTaskFailures, blacklistTrackerOpt)
244+
new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
251245
}
252246

253247
override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized {
@@ -853,24 +847,20 @@ private[spark] class TaskSchedulerImpl(
853847
* If an earlier attempt of a stage completes a task, we should ensure that the later attempts
854848
* do not also submit those same tasks. That also means that a task completion from an earlier
855849
* attempt can lead to the entire stage getting marked as successful.
856-
* And there's a situation that the active TaskSetManager corresponding to the stage may
857-
* haven't been created at the time we call this method. And it is possible since the behaviour
858-
* of calling on this method and creating active TaskSetManager is from two different threads,
859-
* which are "task-result-getter" and "dag-scheduler-event-loop" separately. Consequently, under
860-
* this situation, the active TaskSetManager which is created later could not learn about the
861-
* finished partitions and keep on launching duplicate tasks, which may lead to job fail for some
862-
* severe cases, see SPARK-25250 for details. So, to avoid the problem, we record the finished
863-
* partitions for that stage here and exclude the already finished tasks when we creating active
864-
* TaskSetManagers later by looking into stageIdToFinishedPartitions. Thus, active TaskSetManager
865-
* could be always notified about the finished partitions whether it has been created or not at
866-
* the time we call this method.
850+
* And there is also the possibility that the DAGScheduler submits another taskset at the same
851+
* time as we're marking a task completed here -- that taskset would have a task for a partition
852+
* that was already completed. We maintain the set of finished partitions in
853+
* stageIdToFinishedPartitions, protected by this, so we can detect those tasks when the taskset
854+
* is submitted. See SPARK-25250 for more details.
855+
*
856+
* note: this method must be called with a lock on this.
867857
*/
868858
private[scheduler] def markPartitionCompletedInAllTaskSets(
869859
stageId: Int,
870860
partitionId: Int,
871-
taskInfo: TaskInfo) = {
861+
taskInfo: Option[TaskInfo]) = {
872862
val finishedPartitions =
873-
stageIdToFinishedPartitions.getOrElseUpdate(stageId, new HashSet[Int])
863+
stageIdToFinishedPartitions.getOrElseUpdate(stageId, new BitSet)
874864
finishedPartitions += partitionId
875865
taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm =>
876866
tsm.markPartitionCompleted(partitionId, taskInfo)

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.NotSerializableException
2121
import java.nio.ByteBuffer
2222
import java.util.concurrent.ConcurrentLinkedQueue
2323

24-
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
24+
import scala.collection.mutable.{ArrayBuffer, BitSet, HashMap, HashSet}
2525
import scala.math.max
2626
import scala.util.control.NonFatal
2727

@@ -189,6 +189,18 @@ private[spark] class TaskSetManager(
189189
addPendingTask(i)
190190
}
191191

192+
{
193+
// TaskSet got submitted by DAGScheduler may have some already completed
194+
// tasks since DAGScheduler does not always know all the tasks that have
195+
// been completed by other tasksets when completing a stage, so we mark
196+
// those tasks as finished here to avoid launching duplicate tasks, while
197+
// holding the TaskSchedulerImpl lock.
198+
// See SPARK-25250 and markPartitionCompletedInAllTaskSets()`
199+
sched.stageIdToFinishedPartitions
200+
.getOrElseUpdate(taskSet.stageId, new BitSet)
201+
.foreach(markPartitionCompleted(_, None))
202+
}
203+
192204
/**
193205
* Track the set of locality levels which are valid given the tasks locality preferences and
194206
* the set of currently available executors. This is updated as executors are added and removed.
@@ -786,7 +798,7 @@ private[spark] class TaskSetManager(
786798
}
787799
// There may be multiple tasksets for this stage -- we let all of them know that the partition
788800
// was completed. This may result in some of the tasksets getting completed.
789-
sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId, info)
801+
sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId, Some(info))
790802
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
791803
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
792804
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
@@ -797,11 +809,12 @@ private[spark] class TaskSetManager(
797809
maybeFinishTaskSet()
798810
}
799811

800-
private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: TaskInfo): Unit = {
812+
private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: Option[TaskInfo])
813+
: Unit = {
801814
partitionToIndex.get(partitionId).foreach { index =>
802815
if (!successful(index)) {
803816
if (speculationEnabled && !isZombie) {
804-
successfulTaskDurations.insert(taskInfo.duration)
817+
taskInfo.foreach { info => successfulTaskDurations.insert(info.duration) }
805818
}
806819
tasksSuccessful += 1
807820
successful(index) = true

core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala

Lines changed: 33 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,7 +1102,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
11021102
}
11031103
}
11041104

1105-
test("Completions in zombie tasksets update status of non-zombie taskset") {
1105+
test("SPARK-23433/25250 Completions in zombie tasksets update status of non-zombie taskset") {
11061106
val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
11071107
val valueSer = SparkEnv.get.serializer.newInstance()
11081108

@@ -1114,9 +1114,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
11141114
}
11151115

11161116
// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
1117-
// two times, so we have three active task sets for one stage. (For this to really happen,
1118-
// you'd need the previous stage to also get restarted, and then succeed, in between each
1119-
// attempt, but that happens outside what we're mocking here.)
1117+
// two times, so we have three TaskSetManagers(2 zombie, 1 active) for one stage. (For this
1118+
// to really happen, you'd need the previous stage to also get restarted, and then succeed,
1119+
// in between each attempt, but that happens outside what we're mocking here.)
11201120
val zombieAttempts = (0 until 2).map { stageAttempt =>
11211121
val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
11221122
taskScheduler.submitTasks(attempt)
@@ -1133,30 +1133,51 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
11331133
assert(tsm.runningTasks === 9)
11341134
tsm
11351135
}
1136+
// we've now got 2 zombie attempts, each with 9 tasks still active but zero active attempt
1137+
// in taskScheduler.
1138+
1139+
// finish partition 1,2 by completing the tasks before a new attempt for the same stage submit.
1140+
// And it's possible since the behaviour of submitting new attempt and handling successful task
1141+
// is from two different threads, which are "task-result-getter" and "dag-scheduler-event-loop"
1142+
// separately.
1143+
(0 until 2).foreach { i =>
1144+
completeTaskSuccessfully(zombieAttempts(i), i + 1)
1145+
assert(taskScheduler.stageIdToFinishedPartitions(0).contains(i + 1))
1146+
}
11361147

1137-
// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for
1138-
// the stage, but this time with insufficient resources so not all tasks are active.
1139-
1148+
// Submit the 3rd attempt still with 10 tasks, this happens due to the race between thread
1149+
// "task-result-getter" and "dag-scheduler-event-loop", where a TaskSet gets submitted with
1150+
// already completed tasks. And this time with insufficient resources so not all tasks are
1151+
// active.
11401152
val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
11411153
taskScheduler.submitTasks(finalAttempt)
11421154
val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
1155+
// Though, finalTsm gets submitted after some tasks succeeds, but it could also know about the
1156+
// finished partition by looking into `stageIdToFinishedPartitions` when it is being created,
1157+
// so that it won't launch any duplicate tasks later.
1158+
(0 until 2).map(_ + 1).foreach { partitionId =>
1159+
val index = finalTsm.partitionToIndex(partitionId)
1160+
assert(finalTsm.successful(index))
1161+
}
1162+
11431163
val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
11441164
val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
11451165
finalAttempt.tasks(task.index).partitionId
11461166
}.toSet
11471167
assert(finalTsm.runningTasks === 5)
11481168
assert(!finalTsm.isZombie)
11491169

1150-
// We simulate late completions from our zombie tasksets, corresponding to all the pending
1151-
// partitions in our final attempt. This means we're only waiting on the tasks we've already
1152-
// launched.
1170+
// We continually simulate late completions from our zombie tasksets(but this time, there's one
1171+
// active attempt exists in taskScheduler), corresponding to all the pending partitions in our
1172+
// final attempt. This means we're only waiting on the tasks we've already launched.
11531173
val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions)
11541174
finalAttemptPendingPartitions.foreach { partition =>
11551175
completeTaskSuccessfully(zombieAttempts(0), partition)
1176+
assert(taskScheduler.stageIdToFinishedPartitions(0).contains(partition))
11561177
}
11571178

11581179
// If there is another resource offer, we shouldn't run anything. Though our final attempt
1159-
// used to have pending tasks, now those tasks have been completed by zombie attempts. The
1180+
// used to have pending tasks, now those tasks have been completed by zombie attempts. The
11601181
// remaining tasks to compute are already active in the non-zombie attempt.
11611182
assert(
11621183
taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty)
@@ -1179,6 +1200,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
11791200
zombieAttempts(partition % 2)
11801201
}
11811202
completeTaskSuccessfully(tsm, partition)
1203+
assert(taskScheduler.stageIdToFinishedPartitions(0).contains(partition))
11821204
}
11831205

11841206
assert(finalTsm.isZombie)
@@ -1204,67 +1226,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
12041226
// perspective, as the failures weren't from a problem w/ the tasks themselves.
12051227
verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0), meq(stageAttempt), any())
12061228
}
1207-
}
1208-
1209-
test("successful tasks from previous attempts could be learnt by later active taskset") {
1210-
val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
1211-
val valueSer = SparkEnv.get.serializer.newInstance()
1212-
val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
1213-
1214-
// submit a taskset with 10 tasks to taskScheduler
1215-
val attempt0 = FakeTask.createTaskSet(10, stageId = 0, stageAttemptId = 0)
1216-
taskScheduler.submitTasks(attempt0)
1217-
// get the current active tsm
1218-
val tsm0 = taskScheduler.taskSetManagerForAttempt(0, 0).get
1219-
// offer sufficient resources
1220-
val offers0 = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
1221-
taskScheduler.resourceOffers(offers0)
1222-
assert(tsm0.runningTasks === 10)
1223-
// fail task 0.0 and mark tsm0 as zombie
1224-
tsm0.handleFailedTask(tsm0.taskAttempts(0)(0).taskId, TaskState.FAILED,
1225-
FetchFailed(null, 0, 0, 0, "fetch failed"))
1226-
// the attempt0 is a zombie, but the tasks are still running (this could be true even if
1227-
// we actively killed those tasks, as killing is best-effort)
1228-
assert(tsm0.isZombie)
1229-
assert(tsm0.runningTasks === 9)
1230-
1231-
1232-
// success task 1.0 , finish partition 1. But now,
1233-
// no active tsm exists in TaskScheduler for stage0.
1234-
tsm0.handleSuccessfulTask(tsm0.taskAttempts(1)(0).taskId, result)
1235-
assert(tsm0.runningTasks === 8)
1236-
assert(taskScheduler.stageIdToFinishedPartitions(0).contains(1))
1237-
1238-
// submit a new taskset with 10 tasks after someone previous task attempt succeed
1239-
val attempt1 = FakeTask.createTaskSet(10, stageId = 0, stageAttemptId = 1)
1240-
taskScheduler.submitTasks(attempt1)
1241-
// get the current active tsm
1242-
val tsm1 = taskScheduler.taskSetManagerForAttempt(0, 1).get
1243-
// tsm1 learns about the finished partition 1 during constructing, so it only need
1244-
// to execute other 9 tasks
1245-
assert(tsm1.taskSet.tasks.length == 9)
1246-
// offer one resource
1247-
val offers1 = (10 until 11).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
1248-
taskScheduler.resourceOffers(offers1)
1249-
assert(tsm1.runningTasks === 1)
1250-
// success task 0.0 in tsm1 and finish partition 0
1251-
tsm1.handleSuccessfulTask(tsm1.taskAttempts(0)(0).taskId, result)
1252-
assert(taskScheduler.stageIdToFinishedPartitions(0).contains(0))
1253-
1254-
1255-
val runningTasks = tsm0.taskSet.tasks.filterNot{ t =>
1256-
taskScheduler.stageIdToFinishedPartitions(0).contains(t.partitionId)
1257-
}
1258-
// finish tsm1 by previous task attempts from tsm0, this remains same behavior with SPARK-23433
1259-
runningTasks.foreach{ t =>
1260-
val attempt = tsm0.taskAttempts(tsm0.partitionToIndex(t.partitionId)).head
1261-
tsm0.handleSuccessfulTask(attempt.taskId, result)
1262-
}
1263-
1264-
assert(taskScheduler.taskSetManagerForAttempt(0, 0).isEmpty)
1265-
assert(taskScheduler.taskSetManagerForAttempt(0, 1).isEmpty)
12661229
assert(taskScheduler.stageIdToFinishedPartitions.isEmpty)
1267-
12681230
}
12691231

12701232
test("don't schedule for a barrier taskSet if available slots are less than pending tasks") {

0 commit comments

Comments
 (0)