@@ -336,30 +336,49 @@ private[spark] class TaskSetManager(
336336 * Dequeue a pending task for a given node and return its index and locality level.
337337 * Only search for tasks matching the given locality constraint.
338338 *
339+ * NOTE: minLocality is for avoiding duplicate traverse of the list (especially when we
340+ * pass NOPREF as maxLocality after the others
341+ *
339342 * @return An option containing (task index within the task set, locality, is speculative?)
340343 */
341- private def findTask (execId : String , host : String , locality : TaskLocality .Value )
342- : Option [(Int , TaskLocality .Value , Boolean )] =
344+ private def findTask (execId : String , host : String , maxLocality : TaskLocality .Value ,
345+ minLocality : TaskLocality .Value )
346+ : Option [(Int , TaskLocality .Value , Boolean )] =
343347 {
344- for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
345- return Some ((index, TaskLocality .PROCESS_LOCAL , false ))
348+ def withinAllowedLocality (locality : TaskLocality .TaskLocality ): Boolean = {
349+ TaskLocality .isAllowed(maxLocality, locality) && {
350+ if (maxLocality != minLocality) {
351+ minLocality < locality
352+ } else {
353+ true
354+ }
355+ }
356+ }
357+
358+ if (withinAllowedLocality(TaskLocality .PROCESS_LOCAL )) {
359+ for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
360+ return Some ((index, TaskLocality .PROCESS_LOCAL , false ))
361+ }
346362 }
347363
348- if (TaskLocality .isAllowed(locality, TaskLocality .NODE_LOCAL )) {
364+ if (withinAllowedLocality( TaskLocality .NODE_LOCAL )) {
349365 for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
350366 return Some ((index, TaskLocality .NODE_LOCAL , false ))
351367 }
368+ }
369+
370+ if (withinAllowedLocality(TaskLocality .NOPREF )) {
352371 // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
353372 for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
354373 return Some ((index, TaskLocality .PROCESS_LOCAL , false ))
355374 }
356375 // find a speculative task if all noPref tasks have been scheduled
357- val specTask = findSpeculativeTask(execId, host, locality ).map {
376+ val specTask = findSpeculativeTask(execId, host, maxLocality ).map {
358377 case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true )}
359378 if (specTask != None ) return specTask
360379 }
361380
362- if (TaskLocality .isAllowed(locality, TaskLocality .RACK_LOCAL )) {
381+ if (withinAllowedLocality( TaskLocality .RACK_LOCAL )) {
363382 for {
364383 rack <- sched.getRackForHost(host)
365384 index <- findTaskFromList(execId, getPendingTasksForRack(rack))
@@ -368,31 +387,53 @@ private[spark] class TaskSetManager(
368387 }
369388 }
370389
371- if (TaskLocality .isAllowed(locality, TaskLocality .ANY )) {
390+ if (withinAllowedLocality( TaskLocality .ANY )) {
372391 for (index <- findTaskFromList(execId, allPendingTasks)) {
373392 return Some ((index, TaskLocality .ANY , false ))
374393 }
375394 }
395+
376396 None
377397 }
378398
379399 /**
380400 * Respond to an offer of a single executor from the scheduler by finding a task
401+ * @param execId the executor Id of the offered resource
402+ * @param host the host Id of the offered resource
403+ * @param preferredLocality the maximum locality we want to schedule the tasks at
404+ * @param bottomLocality the minimum locality we want to schedule the tasks at, this
405+ * parameter is mainly used to avoid some duplicate traversing of
406+ * the task lists, after we have determined that we have no candidate
407+ * tasks on certain levels
408+ * @param allowAdjustPrefLocality this parameter is mainly for scheduling noPref tasks, where
409+ * we do not want to apply delay scheduling on this kind of tasks
381410 */
382411 def resourceOffer (
383412 execId : String ,
384413 host : String ,
385- preferredLocality : TaskLocality .TaskLocality )
414+ preferredLocality : TaskLocality .TaskLocality ,
415+ bottomLocality : TaskLocality .TaskLocality ,
416+ allowAdjustPrefLocality : Boolean = true )
386417 : Option [TaskDescription ] =
387418 {
388419 if (! isZombie) {
389420 val curTime = clock.getTime()
390421
391422 var allowedLocality = getAllowedLocalityLevel(curTime)
423+
392424 if (allowedLocality > preferredLocality) {
393- allowedLocality = preferredLocality // We're not allowed to search for farther-away tasks
425+ // We're not allowed to search for farther-away tasks
426+ allowedLocality = preferredLocality
394427 }
395- findTask(execId, host, allowedLocality) match {
428+
429+ val foundTask = {
430+ if (allowAdjustPrefLocality) {
431+ findTask(execId, host, allowedLocality, bottomLocality)
432+ } else {
433+ findTask(execId, host, preferredLocality, bottomLocality)
434+ }
435+ }
436+ foundTask match {
396437 case Some ((index, taskLocality, speculative)) => {
397438 // Found a task; do some bookkeeping and return a task description
398439 val task = tasks(index)
@@ -433,6 +474,10 @@ private[spark] class TaskSetManager(
433474 return Some (new TaskDescription (taskId, execId, taskName, index, serializedTask))
434475 }
435476 case _ =>
477+ if (preferredLocality != TaskLocality .NOPREF ) {
478+ return resourceOffer(execId, host, TaskLocality .NOPREF , preferredLocality,
479+ allowAdjustPrefLocality = false )
480+ }
436481 }
437482 }
438483 None
@@ -634,8 +679,7 @@ private[spark] class TaskSetManager(
634679 override def executorLost (execId : String , host : String ) {
635680 logInfo(" Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
636681
637- // Re-enqueue pending tasks for this host based on the status of the cluster -- for example, a
638- // task that used to have locations on only this host might now go to the no-prefs list. Note
682+ // Re-enqueue pending tasks for this host based on the status of the cluster. Note
639683 // that it's okay if we add a task to the same queue twice (if it had multiple preferred
640684 // locations), because findTaskFromList will skip already-running tasks.
641685 for (index <- getPendingTasksForExecutor(execId)) {
@@ -666,6 +710,9 @@ private[spark] class TaskSetManager(
666710 for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
667711 handleFailedTask(tid, TaskState .FAILED , ExecutorLostFailure )
668712 }
713+ // recalculate valid locality levels and waits when executor is lost
714+ myLocalityLevels = computeValidLocalityLevels()
715+ localityWaits = myLocalityLevels.map(getLocalityWait)
669716 }
670717
671718 /**
@@ -725,6 +772,8 @@ private[spark] class TaskSetManager(
725772 /**
726773 * Compute the locality levels used in this TaskSet. Assumes that all tasks have already been
727774 * added to queues using addPendingTask.
775+ *
776+ * NOTE: don't need to handle NOPREF here, because NOPREF is scheduled as PROCESS_LOCAL
728777 */
729778 private def computeValidLocalityLevels (): Array [TaskLocality .TaskLocality ] = {
730779 import TaskLocality .{PROCESS_LOCAL , NODE_LOCAL , RACK_LOCAL , ANY }
@@ -747,16 +796,6 @@ private[spark] class TaskSetManager(
747796 }
748797
749798 def executorAdded () {
750- def newLocAvail (index : Int ): Boolean = {
751- for (loc <- tasks(index).preferredLocations) {
752- if (sched.hasExecutorsAliveOnHost(loc.host) ||
753- (sched.getRackForHost(loc.host).isDefined &&
754- sched.hasHostAliveOnRack(sched.getRackForHost(loc.host).get))) {
755- return true
756- }
757- }
758- false
759- }
760799 myLocalityLevels = computeValidLocalityLevels()
761800 localityWaits = myLocalityLevels.map(getLocalityWait)
762801 }
0 commit comments