Skip to content

[SPARK-13279] Remove unnecessary duplicate check in addPendingTask fu… #11175

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 13 additions & 15 deletions core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,14 @@ private[spark] class TaskSetManager(
// treated as stacks, in which new tasks are added to the end of the
// ArrayBuffer and removed from the end. This makes it faster to detect
// tasks that repeatedly fail because whenever a task failed, it is put
// back at the head of the stack. They are also only cleaned up lazily;
// when a task is launched, it remains in all the pending lists except
// the one that it was launched from, but gets removed from them later.
// back at the head of the stack. These collections may contain duplicates
// for two reasons:
// (1): Tasks are only removed lazily; when a task is launched, it remains
// in all the pending lists except the one that it was launched from.
// (2): Tasks may be re-added to these lists multiple times as a result
// of failures.
// Duplicates are handled in dequeueTaskFromList, which ensures that a
// task hasn't already started running before launching it.
private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]

// Set of pending tasks for each host. Similar to pendingTasksForExecutor,
Expand Down Expand Up @@ -179,23 +184,16 @@ private[spark] class TaskSetManager(

/** Add a task to all the pending-task lists that it should be on. */
private def addPendingTask(index: Int) {
// Utility method that adds `index` to a list only if it's not already there
def addTo(list: ArrayBuffer[Int]) {
if (!list.contains(index)) {
list += index
}
}

for (loc <- tasks(index).preferredLocations) {
loc match {
case e: ExecutorCacheTaskLocation =>
addTo(pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer))
pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
case e: HDFSCacheTaskLocation => {
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
case Some(set) => {
for (e <- set) {
addTo(pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer))
pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
}
logInfo(s"Pending task $index has a cached location at ${e.host} " +
", where there are executors " + set.mkString(","))
Expand All @@ -206,14 +204,14 @@ private[spark] class TaskSetManager(
}
case _ => Unit
}
addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
for (rack <- sched.getRackForHost(loc.host)) {
addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
}
}

if (tasks(index).preferredLocations == Nil) {
addTo(pendingTasksWithNoPrefs)
pendingTasksWithNoPrefs += index
}

allPendingTasks += index // No point scanning this whole list to find the old task there
Expand Down