@@ -114,9 +114,14 @@ private[spark] class TaskSetManager(
114
114
// treated as stacks, in which new tasks are added to the end of the
115
115
// ArrayBuffer and removed from the end. This makes it faster to detect
116
116
// tasks that repeatedly fail because whenever a task failed, it is put
117
- // back at the head of the stack. They are also only cleaned up lazily;
118
- // when a task is launched, it remains in all the pending lists except
119
- // the one that it was launched from, but gets removed from them later.
117
+ // back at the head of the stack. These collections may contain duplicates
118
+ // for two reasons:
119
+ // (1): Tasks are only removed lazily; when a task is launched, it remains
120
+ // in all the pending lists except the one that it was launched from.
121
+ // (2): Tasks may be re-added to these lists multiple times as a result
122
+ // of failures.
123
+ // Duplicates are handled in dequeueTaskFromList, which ensures that a
124
+ // task hasn't already started running before launching it.
120
125
private val pendingTasksForExecutor = new HashMap [String , ArrayBuffer [Int ]]
121
126
122
127
// Set of pending tasks for each host. Similar to pendingTasksForExecutor,
@@ -179,23 +184,16 @@ private[spark] class TaskSetManager(
179
184
180
185
/** Add a task to all the pending-task lists that it should be on. */
181
186
private def addPendingTask (index : Int ) {
182
- // Utility method that adds `index` to a list only if it's not already there
183
- def addTo (list : ArrayBuffer [Int ]) {
184
- if (! list.contains(index)) {
185
- list += index
186
- }
187
- }
188
-
189
187
for (loc <- tasks(index).preferredLocations) {
190
188
loc match {
191
189
case e : ExecutorCacheTaskLocation =>
192
- addTo( pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer ))
190
+ pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer ) += index
193
191
case e : HDFSCacheTaskLocation => {
194
192
val exe = sched.getExecutorsAliveOnHost(loc.host)
195
193
exe match {
196
194
case Some (set) => {
197
195
for (e <- set) {
198
- addTo( pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer ))
196
+ pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer ) += index
199
197
}
200
198
logInfo(s " Pending task $index has a cached location at ${e.host} " +
201
199
" , where there are executors " + set.mkString(" ," ))
@@ -206,14 +204,14 @@ private[spark] class TaskSetManager(
206
204
}
207
205
case _ => Unit
208
206
}
209
- addTo( pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer ))
207
+ pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer ) += index
210
208
for (rack <- sched.getRackForHost(loc.host)) {
211
- addTo( pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer ))
209
+ pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer ) += index
212
210
}
213
211
}
214
212
215
213
if (tasks(index).preferredLocations == Nil ) {
216
- addTo( pendingTasksWithNoPrefs)
214
+ pendingTasksWithNoPrefs += index
217
215
}
218
216
219
217
allPendingTasks += index // No point scanning this whole list to find the old task there
0 commit comments