Skip to content

Commit fff4123

Browse files
author
Rui Li
committed
fix computing valid locality levels
1 parent 685ed3d commit fff4123

File tree

1 file changed

+7
-4
lines changed

1 file changed

+7
-4
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ private[spark] class TaskSetManager(
153153
}
154154

155155
// Figure out which locality levels we have in our TaskSet, so we can do delay scheduling
156-
val myLocalityLevels = computeValidLocalityLevels()
156+
var myLocalityLevels = computeValidLocalityLevels()
157157
val localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level
158158

159159
// Delay scheduling variables: we keep track of our current locality level and the time we
@@ -386,7 +386,7 @@ private[spark] class TaskSetManager(
386386
val curTime = clock.getTime()
387387

388388
var allowedLocality = getAllowedLocalityLevel(curTime)
389-
if (allowedLocality > maxLocality) {
389+
if (allowedLocality > maxLocality && myLocalityLevels.contains(maxLocality)) {
390390
allowedLocality = maxLocality // We're not allowed to search for farther-away tasks
391391
}
392392

@@ -723,10 +723,12 @@ private[spark] class TaskSetManager(
723723
private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
724724
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY}
725725
val levels = new ArrayBuffer[TaskLocality.TaskLocality]
726-
if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0) {
726+
if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&
727+
pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
727728
levels += PROCESS_LOCAL
728729
}
729-
if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0) {
730+
if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 &&
731+
pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
730732
levels += NODE_LOCAL
731733
}
732734
if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0) {
@@ -750,5 +752,6 @@ private[spark] class TaskSetManager(
750752
}
751753
logInfo("Re-computing pending task lists.")
752754
pendingTasksWithNoPrefs = pendingTasksWithNoPrefs.filter(!newLocAvail(_))
755+
myLocalityLevels = computeValidLocalityLevels()
753756
}
754757
}

0 commit comments

Comments
 (0)