Skip to content

Commit 98354ca

Browse files
Sital Kediakayousterhout
authored andcommitted
[SPARK-13279] Remove O(n^2) operation from scheduler.
This commit removes an unnecessary duplicate check in addPendingTask that meant that scheduling a task set took time proportional to (# tasks)^2. Author: Sital Kedia <skedia@fb.com> Closes #11175 from sitalkedia/fix_stuck_driver. (cherry picked from commit 1e1e31e) Signed-off-by: Kay Ousterhout <kayousterhout@gmail.com>
1 parent d950891 commit 98354ca

File tree

1 file changed

+13
-15
lines changed

1 file changed

+13
-15
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,14 @@ private[spark] class TaskSetManager(
114114
// treated as stacks, in which new tasks are added to the end of the
115115
// ArrayBuffer and removed from the end. This makes it faster to detect
116116
// tasks that repeatedly fail because whenever a task failed, it is put
117-
// back at the head of the stack. They are also only cleaned up lazily;
118-
// when a task is launched, it remains in all the pending lists except
119-
// the one that it was launched from, but gets removed from them later.
117+
// back at the head of the stack. These collections may contain duplicates
118+
// for two reasons:
119+
// (1): Tasks are only removed lazily; when a task is launched, it remains
120+
// in all the pending lists except the one that it was launched from.
121+
// (2): Tasks may be re-added to these lists multiple times as a result
122+
// of failures.
123+
// Duplicates are handled in dequeueTaskFromList, which ensures that a
124+
// task hasn't already started running before launching it.
120125
private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]
121126

122127
// Set of pending tasks for each host. Similar to pendingTasksForExecutor,
@@ -179,23 +184,16 @@ private[spark] class TaskSetManager(
179184

180185
/** Add a task to all the pending-task lists that it should be on. */
181186
private def addPendingTask(index: Int) {
182-
// Utility method that adds `index` to a list only if it's not already there
183-
def addTo(list: ArrayBuffer[Int]) {
184-
if (!list.contains(index)) {
185-
list += index
186-
}
187-
}
188-
189187
for (loc <- tasks(index).preferredLocations) {
190188
loc match {
191189
case e: ExecutorCacheTaskLocation =>
192-
addTo(pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer))
190+
pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
193191
case e: HDFSCacheTaskLocation => {
194192
val exe = sched.getExecutorsAliveOnHost(loc.host)
195193
exe match {
196194
case Some(set) => {
197195
for (e <- set) {
198-
addTo(pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer))
196+
pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
199197
}
200198
logInfo(s"Pending task $index has a cached location at ${e.host} " +
201199
", where there are executors " + set.mkString(","))
@@ -206,14 +204,14 @@ private[spark] class TaskSetManager(
206204
}
207205
case _ => Unit
208206
}
209-
addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
207+
pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
210208
for (rack <- sched.getRackForHost(loc.host)) {
211-
addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
209+
pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
212210
}
213211
}
214212

215213
if (tasks(index).preferredLocations == Nil) {
216-
addTo(pendingTasksWithNoPrefs)
214+
pendingTasksWithNoPrefs += index
217215
}
218216

219217
allPendingTasks += index // No point scanning this whole list to find the old task there

0 commit comments

Comments
 (0)