Skip to content

Commit b3a430b

Browse files
committed
remove fine granularity tracking for node-local only tasks
1 parent f9a2ad8 commit b3a430b

File tree

3 files changed

+10
-12
lines changed

3 files changed

+10
-12
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ private[spark] class TaskSchedulerImpl(
249249

250250
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
251251
// of locality levels so that it gets a chance to launch local tasks on all of them.
252-
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NOPREF, RACK_LOCAL, ANY
252+
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
253253
var launchedTask = false
254254
for (taskSet <- sortedTaskSets; preferredLocality <- taskSet.myLocalityLevels) {
255255
do {

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,8 @@ private[spark] class TaskSetManager(
114114
// but at host level.
115115
private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]
116116

117-
// this collection is mainly for ensuring that the NODE_LOCAL task is always scheduled
118-
// before NOPREF and it contain all NODE_LOCAL and "not-launched" tasks
119-
private[scheduler] val nodeLocalTasks = new HashMap[String, HashSet[Int]]
117+
//private[scheduler] val nodeLocalTasks = new HashMap[String, HashSet[Int]]
118+
private var hasNodeLocalOnlyTasks = true
120119

121120
// Set of pending tasks for each rack -- similar to the above.
122121
private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]
@@ -194,7 +193,7 @@ private[spark] class TaskSetManager(
194193
}
195194
addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
196195
if (loc.executorId == None) {
197-
nodeLocalTasks.getOrElseUpdate(loc.host, new HashSet[Int]) += index
196+
hasNodeLocalOnlyTasks = true
198197
}
199198
for (rack <- sched.getRackForHost(loc.host)) {
200199
addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
@@ -401,7 +400,7 @@ private[spark] class TaskSetManager(
401400
*
402401
* NOTE: this function is either called with a real preferredLocality level which
403402
* would be adjusted by delay scheduling algorithm or it will be with a special
404-
* NOPREF locality which will be not modified
403+
* NO_PREF locality which will be not modified
405404
*
406405
* @param execId the executor Id of the offered resource
407406
* @param host the host Id of the offered resource
@@ -418,8 +417,7 @@ private[spark] class TaskSetManager(
418417

419418
var allowedLocality = maxLocality
420419

421-
if (maxLocality != TaskLocality.NO_PREF ||
422-
(nodeLocalTasks.contains(host) && nodeLocalTasks(host).size > 0)) {
420+
if (maxLocality != TaskLocality.NO_PREF || hasNodeLocalOnlyTasks) {
423421
allowedLocality = getAllowedLocalityLevel(curTime)
424422
if (allowedLocality > maxLocality) {
425423
// We're not allowed to search for farther-away tasks
@@ -440,7 +438,7 @@ private[spark] class TaskSetManager(
440438
taskInfos(taskId) = info
441439
taskAttempts(index) = info :: taskAttempts(index)
442440
// Update our locality level for delay scheduling
443-
// NOPREF will not affect the variables related to delay scheduling
441+
// NO_PREF will not affect the variables related to delay scheduling
444442
if (maxLocality != TaskLocality.NO_PREF) {
445443
currentLocalityIndex = getLocalityIndex(taskLocality)
446444
lastLaunchTime = curTime
@@ -468,13 +466,13 @@ private[spark] class TaskSetManager(
468466
taskName, taskId, host, taskLocality, serializedTask.limit))
469467

470468
sched.dagScheduler.taskStarted(task, info)
471-
if (taskLocality <= TaskLocality.NODE_LOCAL) {
469+
/*if (taskLocality <= TaskLocality.NODE_LOCAL) {
472470
for (preferedLocality <- tasks(index).preferredLocations) {
473471
if (nodeLocalTasks.contains(preferedLocality.host)) {
474472
nodeLocalTasks(preferedLocality.host) -= index
475473
}
476474
}
477-
}
475+
}*/
478476
return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask))
479477
}
480478
case _ =>

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
161161
val clock = new FakeClock
162162
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
163163

164-
// Offer a host with NOPREF as the constraint,
164+
// Offer a host with NO_PREF as the constraint,
165165
// we should get a nopref task immediately since that's what we only have
166166
var taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
167167
assert(taskOption.isDefined)

0 commit comments

Comments
 (0)