Skip to content

Commit d11af64

Browse files
wangyumGitHub Enterprise
authored andcommitted
[CARMEL-7374][CARMEL-5347] Fair task scheduler consider user resource usage (apache#118)
1 parent f83808a commit d11af64

File tree

6 files changed

+47
-6
lines changed

6 files changed

+47
-6
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2102,6 +2102,14 @@ package object config {
21022102
.timeConf(TimeUnit.MILLISECONDS)
21032103
.createOptional
21042104

2105+
private[spark] val SCHEDULER_FAIR_USER_LEVEL_ENABLED =
2106+
ConfigBuilder("spark.scheduler.fair.userLevel")
2107+
.internal()
2108+
.doc("If set true, and scheduler mode set to fair, " +
2109+
"task scheduler will consider user running tasks instead of only task set level")
2110+
.booleanConf
2111+
.createWithDefault(true)
2112+
21052113
private[spark] val SPECULATION_ENABLED =
21062114
ConfigBuilder("spark.speculation")
21072115
.version("0.6.0")

core/src/main/scala/org/apache/spark/scheduler/AnalyticsTaskSchedulerImpl.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import org.apache.spark.internal.config
3838
import org.apache.spark.internal.config._
3939
import org.apache.spark.resource.ResourceUtils
4040
import org.apache.spark.scheduler.TaskLocality.TaskLocality
41+
import org.apache.spark.scheduler.TaskSchedulerImpl.SCHEDULER_MODE_PROPERTY
4142
import org.apache.spark.storage.BlockManagerId
4243
import org.apache.spark.util.{AccumulatorV2, SystemClock, ThreadUtils, Utils}
4344

@@ -100,6 +101,26 @@ private[spark] class AnalyticsTaskSchedulerImpl(
100101
private val FURTHER_SCHEDULE_LOCK_WAIT_TIME = conf.
101102
getTimeAsMs("spark.task.furtherScheduledLockWaitTime", "5s")
102103

104+
private val userLevelFairScheduledEnabled = conf.get(SCHEDULER_FAIR_USER_LEVEL_ENABLED)
105+
106+
override def initialize(backend: SchedulerBackend): Unit = {
107+
this.backend = backend
108+
schedulableBuilder = {
109+
schedulingMode match {
110+
case SchedulingMode.FIFO =>
111+
new FIFOSchedulableBuilder(rootPool)
112+
case SchedulingMode.FAIR =>
113+
val userResourceManager =
114+
if (userLevelFairScheduledEnabled) sc.userResourceManager else None
115+
new FairSchedulableBuilder(rootPool, sc, userResourceManager)
116+
case _ =>
117+
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
118+
s"$schedulingMode")
119+
}
120+
}
121+
schedulableBuilder.buildPools()
122+
}
123+
103124
override def start(): Unit = {
104125
backend.start()
105126

core/src/main/scala/org/apache/spark/scheduler/AnalyticsTaskSetManager.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ private[spark] class AnalyticsTaskSetManager(
7272

7373
override def userInfo(): Option[UserInfo] = _userInfo
7474

75+
override def user: Option[String] = _userInfo.map(_.user)
76+
7577
// Add all our tasks to the pending lists. We do this in reverse order
7678
// of task index so that tasks with low indices get launched first.
7779
addPendingTasks()

core/src/main/scala/org/apache/spark/scheduler/Pool.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ private[spark] class Pool(
3232
val poolName: String,
3333
val schedulingMode: SchedulingMode,
3434
initMinShare: Int,
35-
initWeight: Int)
35+
initWeight: Int,
36+
userResourceManager: Option[UserResourceManager] = None)
3637
extends Schedulable with Logging {
3738

3839
val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]
@@ -50,7 +51,7 @@ private[spark] class Pool(
5051
private val taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
5152
schedulingMode match {
5253
case SchedulingMode.FAIR =>
53-
new FairSchedulingAlgorithm()
54+
new FairSchedulingAlgorithm(userResourceManager)
5455
case SchedulingMode.FIFO =>
5556
new FIFOSchedulingAlgorithm()
5657
case _ =>

core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
5656
}
5757
}
5858

59-
private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext)
59+
private[spark] class FairSchedulableBuilder(
60+
val rootPool: Pool, sc: SparkContext, userResourceManager: Option[UserResourceManager] = None)
6061
extends SchedulableBuilder with Logging {
6162

6263
val schedulerAllocFile = sc.conf.get(SCHEDULER_ALLOCATION_FILE)
@@ -135,7 +136,8 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext
135136
val weight = getIntValue(poolNode, poolName, WEIGHT_PROPERTY,
136137
DEFAULT_WEIGHT, fileName)
137138

138-
rootPool.addSchedulable(new Pool(poolName, schedulingMode, minShare, weight))
139+
rootPool.addSchedulable(
140+
new Pool(poolName, schedulingMode, minShare, weight, userResourceManager))
139141

140142
logInfo("Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format(
141143
poolName, schedulingMode, minShare, weight))

core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
4040
}
4141
}
4242

43-
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
43+
private[spark] class FairSchedulingAlgorithm(
44+
userResourceManager: Option[UserResourceManager]) extends SchedulingAlgorithm {
4445
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
4546
val minShare1 = s1.minShare
4647
val minShare2 = s2.minShare
@@ -61,7 +62,13 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
6162
} else if (s1Needy && s2Needy) {
6263
compare = minShareRatio1.compareTo(minShareRatio2)
6364
} else {
64-
compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
65+
if (userResourceManager.isDefined && s1.user.isDefined && s2.user.isDefined) {
66+
val r = userResourceManager.get
67+
compare = r.occupiedCpus(s1.user.get).compareTo(r.occupiedCpus(s2.user.get))
68+
}
69+
if (compare == 0) {
70+
compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
71+
}
6572
}
6673
if (compare < 0) {
6774
true

0 commit comments

Comments
 (0)