Skip to content

Commit c7b93b5

Browse files
author
Rui Li
committed
revise patch
1 parent 3d7da02 commit c7b93b5

File tree

2 files changed

+11
-11
lines changed

2 files changed

+11
-11
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -209,16 +209,15 @@ private[spark] class TaskSchedulerImpl(
209209
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
210210
SparkEnv.set(sc.env)
211211

212-
val sortedTaskSets = rootPool.getSortedTaskSetQueue
213212
// Mark each slave as alive and remember its hostname
213+
// Also track if new executor is added
214+
var newExecAvail = false
214215
for (o <- offers) {
215216
executorIdToHost(o.executorId) = o.host
216217
if (!executorsByHost.contains(o.host)) {
217218
executorsByHost(o.host) = new HashSet[String]()
218219
executorAdded(o.executorId, o.host)
219-
for (taskSet <- sortedTaskSets) {
220-
taskSet.executorAdded(o.executorId, o.host)
221-
}
220+
newExecAvail = true
222221
}
223222
}
224223

@@ -227,9 +226,13 @@ private[spark] class TaskSchedulerImpl(
227226
// Build a list of tasks to assign to each worker.
228227
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
229228
val availableCpus = shuffledOffers.map(o => o.cores).toArray
229+
val sortedTaskSets = rootPool.getSortedTaskSetQueue
230230
for (taskSet <- sortedTaskSets) {
231231
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
232232
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
233+
if (newExecAvail) {
234+
taskSet.executorAdded()
235+
}
233236
}
234237

235238
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -752,15 +752,12 @@ private[spark] class TaskSetManager(
752752
}
753753

754754
// Re-compute pendingTasksWithNoPrefs since new preferred locations may become available
755-
def executorAdded(execId: String, host: String) {
755+
def executorAdded() {
756756
def newLocAvail(index: Int): Boolean = {
757757
for (loc <- tasks(index).preferredLocations) {
758-
if (execId.equals(loc.executorId.getOrElse(null)) || host.equals(loc.host)) {
759-
return true
760-
}
761-
val availRack = sched.getRackForHost(host)
762-
val prefRack = sched.getRackForHost(loc.host)
763-
if (prefRack.isDefined && prefRack.get.equals(availRack.getOrElse(null))) {
758+
if (sched.hasExecutorsAliveOnHost(loc.host) ||
759+
(loc.executorId.isDefined && sched.isExecutorAlive(loc.executorId.get)) ||
760+
sched.getRackForHost(loc.host).isDefined) {
764761
return true
765762
}
766763
}

0 commit comments

Comments
 (0)