File tree Expand file tree Collapse file tree 2 files changed +7
-7
lines changed
core/src/main/scala/org/apache/spark/scheduler Expand file tree Collapse file tree 2 files changed +7
-7
lines changed Original file line number Diff line number Diff line change @@ -212,7 +212,7 @@ private[spark] class TaskSchedulerImpl(
212212 SparkEnv .set(sc.env)
213213
214214 // Mark each slave as alive and remember its hostname
215- // also track if new executor is added
215+ // Also track if new executor is added
216216 var newExecAvail = false
217217 for (o <- offers) {
218218 executorIdToHost(o.executorId) = o.host
@@ -232,15 +232,15 @@ private[spark] class TaskSchedulerImpl(
232232 for (taskSet <- sortedTaskSets) {
233233 logDebug(" parentName: %s, name: %s, runningTasks: %s" .format(
234234 taskSet.parent.name, taskSet.name, taskSet.runningTasks))
235+ if (delaySchedule && newExecAvail) {
236+ taskSet.reAddPendingTasks()
237+ }
235238 }
236239
237240 // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
238241 // of locality levels so that it gets a chance to launch local tasks on all of them.
239242 var launchedTask = false
240243 for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality .values) {
241- if (delaySchedule && newExecAvail) {
242- taskSet.reAddPendingTasks()
243- }
244244 do {
245245 launchedTask = false
246246 for (i <- 0 until shuffledOffers.size) {
Original file line number Diff line number Diff line change @@ -196,8 +196,7 @@ private[spark] class TaskSetManager(
196196 }
197197 }
198198
199- if (tasks(index).preferredLocations.isEmpty ||
200- (! delaySchedule && ! hadAliveLocations)) {
199+ if (tasks(index).preferredLocations.isEmpty || (! delaySchedule && ! hadAliveLocations)) {
201200 // Even though the task might've had preferred locations, all of those hosts or executors
202201 // are dead; put it in the no-prefs list so we can schedule it elsewhere right away.
203202 addTo(pendingTasksWithNoPrefs)
@@ -744,7 +743,8 @@ private[spark] class TaskSetManager(
744743 // Re-compute the pending lists. This should be called when new executor is added
745744 def reAddPendingTasks () {
746745 logInfo(" Re-computing pending task lists." )
747- for (i <- (0 until numTasks).reverse.filter(index => copiesRunning(index) == 0 && ! successful(index))) {
746+ for (i <- (0 until numTasks).reverse.filter(index => copiesRunning(index) == 0
747+ && ! successful(index))) {
748748 addPendingTask(i, readding = true )
749749 }
750750 }
You can’t perform that action at this time.
0 commit comments