Skip to content

Commit 3b05870

Browse files
WenboZhaoCurtis Howard
authored andcommitted
Make Cook scheduler respect minRegisteredResourcesRatio (apache#22)
* Make Cook scheduler respect minRegisteredResourcesRatio * More detail comments * address comments * address comments * address comments (cherry picked from commit 2b5b8cd) (cherry picked from commit d39b204) (cherry picked from commit 49a3460) (cherry picked from commit b7e966a)
1 parent 7116927 commit 3b05870

File tree

2 files changed

+22
-10
lines changed

2 files changed

+22
-10
lines changed

cook/src/main/scala/org/apache/spark/scheduler/cluster/cook/CoarseCookSchedulerBackend.scala

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -151,9 +151,17 @@ class CoarseCookSchedulerBackend(
151151

152152
/**
153153
* Note that the mapping between Cook job and executor is 1-1 and onto, thus
154-
* the number of acquired executors equals to the number non-completed jobs.
154+
* the number of requested executors equals to the number non-completed jobs.
155155
*/
156-
private def totalExecutorsAcquired: Int = nonCompletedJobUUIDs.size
156+
private def totalExecutorsRequested: Int = nonCompletedJobUUIDs.size
157+
158+
/**
159+
* We can't use `totalExecutorsRequested` as the number of registered executors because
160+
* the latency between requesting resources from Cook and launching jobs after that.
161+
* However, `getExecutorIds()` only returns the list of registered executor ids and
162+
* thus could be used for querying the number of registered executors.
163+
*/
164+
private def totalExecutorsRegistered: Int = getExecutorIds().length
157165

158166
/**
159167
* The set of UUIDs for the jobs that are aborted intentionally, e.g.
@@ -198,8 +206,10 @@ class CoarseCookSchedulerBackend(
198206
private[this] val mesosSchedulerBackend =
199207
new MesosCoarseGrainedSchedulerBackend(scheduler, sc, "", sc.env.securityManager)
200208

209+
// This is only used by TaskScheduler for checking if the scheduler backend is ready
210+
// before sending the first batch of tasks.
201211
override def sufficientResourcesRegistered(): Boolean =
202-
totalExecutorsAcquired >= executorLimit * schedulerContext.minRegisteredResourceRatio
212+
totalExecutorsRegistered >= executorLimit * schedulerContext.minRegisteredResourceRatio
203213

204214
override def applicationId(): String =
205215
schedulerContext.cookApplicationIdOption.getOrElse(super.applicationId())
@@ -336,8 +346,10 @@ class CoarseCookSchedulerBackend(
336346
val cur = System.currentTimeMillis
337347
if (!ret && cur - lastIsReadyLog > 5000) {
338348
logInfo(
339-
s"Backend is not yet ready. Total acquired executors [$totalExecutorsAcquired] " +
340-
s"vs executor limit [$executorLimit]")
349+
s"Scheduler backend is not yet ready: " +
350+
s"number of requested executors [$totalExecutorsRequested] vs " +
351+
s"number of registered executors [$totalExecutorsRegistered] vs " +
352+
s"executor limit [$executorLimit].")
341353
lastIsReadyLog = cur
342354
}
343355
ret
@@ -438,15 +450,15 @@ class CoarseCookSchedulerBackend(
438450
}
439451

440452
private def shouldRequestExecutors(): Boolean =
441-
totalExecutorsAcquired < executorLimit
453+
totalExecutorsRequested < executorLimit
442454

443455
private def executorStatusMessage(): String =
444-
s"Currently, the total acquired executors is [$totalExecutorsAcquired] " +
456+
s"Currently, the total requested executors is [$totalExecutorsRequested] " +
445457
s"and the executor limit is [$executorLimit]."
446458

447459
private def requestExecutorsIfNecessary(): Unit =
448460
if (shouldRequestExecutors()) {
449-
val requestedExecutors = executorLimit - totalExecutorsAcquired
461+
val requestedExecutors = executorLimit - totalExecutorsRequested
450462

451463
if (requestedExecutors > 0) {
452464
val executorIdAndJob = (1 to requestedExecutors).map { _ =>

core/src/main/scala/org/apache/spark/scheduler/CookSchedulerContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ case class CookSchedulerContext(
6363
// ==========================================================================
6464

6565
val minRegisteredResourceRatio: Double = math.min(
66-
1,
67-
conf.getDouble(SPARK_SCHEDULER_MIN_REGISTERED_RESOURCE_RATIO, 0))
66+
1d,
67+
conf.getDouble(SPARK_SCHEDULER_MIN_REGISTERED_RESOURCE_RATIO, 0d))
6868

6969
val isDynamicAllocationEnabled: Boolean =
7070
conf.getBoolean(SPARK_DYNAMIC_ALLOCATION_ENABLED, defaultValue = false)

0 commit comments

Comments
 (0)