Skip to content

Commit 4afbaf0

Browse files
committed
Support configuration spark.scheduler.minRegisteredResourcesRatio in Mesos mode.
1 parent d20559b commit 4afbaf0

File tree

2 files changed

+10
-0
lines changed

2 files changed

+10
-0
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ private[spark] class CoarseMesosSchedulerBackend(
6363
// Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
6464
val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
6565

66+
val totalExpectedCores = conf.getInt("spark.cores.max", 0)
67+
6668
// Cores we have acquired with each Mesos task ID
6769
val coresByTaskId = new HashMap[Int, Int]
6870
var totalCoresAcquired = 0
@@ -333,4 +335,7 @@ private[spark] class CoarseMesosSchedulerBackend(
333335
super.applicationId
334336
}
335337

338+
override def sufficientResourcesRegistered(): Boolean = {
339+
totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio
340+
}
336341
}

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ private[spark] class MesosSchedulerBackend(
6969
val listenerBus = sc.listenerBus
7070

7171
@volatile var appId: String = _
72+
73+
if (!sc.getConf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
74+
logWarning("spark.scheduler.minRegisteredResourcesRatio is set, "
75+
+ "but it will be ignored in mesos fine-grained mode.")
76+
}
7277

7378
override def start() {
7479
synchronized {

0 commit comments

Comments
 (0)