Skip to content

Commit cda20f5

Browse files
sandfleeash211
authored andcommitted
Config for hard cpu limit on pods; default unlimited
(cherry picked from commit 8b3248f)
1 parent 01cc091 commit cda20f5

File tree

4 files changed

+58
-2
lines changed

4 files changed

+58
-2
lines changed

docs/running-on-kubernetes.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -720,6 +720,20 @@ from the other deployment modes. See the [configuration page](configuration.html
720720
Docker image pull policy used when pulling Docker images with Kubernetes.
721721
</td>
722722
</tr>
723+
<tr>
724+
<td><code>spark.kubernetes.driver.limit.cores</code></td>
725+
<td>(none)</td>
726+
<td>
727+
Specify the hard cpu limit for the driver pod
728+
</td>
729+
</tr>
730+
<tr>
731+
<td><code>spark.kubernetes.executor.limit.cores</code></td>
732+
<td>(none)</td>
733+
<td>
734+
Specify the hard cpu limit for a single executor pod
735+
</td>
736+
</tr>
723737
</table>
724738

725739

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,18 @@ package object config extends Logging {
485485
.stringConf
486486
.createOptional
487487

488+
private[spark] val KUBERNETES_DRIVER_LIMIT_CORES =
489+
ConfigBuilder("spark.kubernetes.driver.limit.cores")
490+
.doc("Specify the hard cpu limit for the driver pod")
491+
.stringConf
492+
.createOptional
493+
494+
private[spark] val KUBERNETES_EXECUTOR_LIMIT_CORES =
495+
ConfigBuilder("spark.kubernetes.executor.limit.cores")
496+
.doc("Specify the hard cpu limit for a single executor pod")
497+
.stringConf
498+
.createOptional
499+
488500
private[spark] def resolveK8sMaster(rawMasterString: String): String = {
489501
if (!rawMasterString.startsWith("k8s://")) {
490502
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ private[spark] class Client(
6464

6565
// CPU settings
6666
private val driverCpuCores = sparkConf.getOption("spark.driver.cores").getOrElse("1")
67+
private val driverLimitCores = sparkConf.getOption(KUBERNETES_DRIVER_LIMIT_CORES.key)
6768

6869
// Memory settings
6970
private val driverMemoryMb = sparkConf.get(org.apache.spark.internal.config.DRIVER_MEMORY)
@@ -139,7 +140,6 @@ private[spark] class Client(
139140
.endEnv()
140141
.withNewResources()
141142
.addToRequests("cpu", driverCpuQuantity)
142-
.addToLimits("cpu", driverCpuQuantity)
143143
.addToRequests("memory", driverMemoryQuantity)
144144
.addToLimits("memory", driverMemoryLimitQuantity)
145145
.endResources()
@@ -156,6 +156,21 @@ private[spark] class Client(
156156
.addToContainers(driverContainer)
157157
.endSpec()
158158

159+
driverLimitCores.map {
160+
limitCores =>
161+
val driverCpuLimitQuantity = new QuantityBuilder(false)
162+
.withAmount(limitCores)
163+
.build()
164+
basePod
165+
.editSpec()
166+
.editFirstContainer()
167+
.editResources
168+
.addToLimits("cpu", driverCpuLimitQuantity)
169+
.endResources()
170+
.endContainer()
171+
.endSpec()
172+
}
173+
159174
val maybeSubmittedResourceIdentifiers = initContainerComponentsProvider
160175
.provideInitContainerSubmittedDependencyUploader(allDriverLabels)
161176
.map { uploader =>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
109109
private val executorMemoryWithOverhead = executorMemoryMb + memoryOverheadMb
110110

111111
private val executorCores = conf.getOption("spark.executor.cores").getOrElse("1")
112+
private val executorLimitCores = conf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key)
112113

113114
private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
114115
ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests"))
@@ -439,14 +440,28 @@ private[spark] class KubernetesClusterSchedulerBackend(
439440
.addToRequests("memory", executorMemoryQuantity)
440441
.addToLimits("memory", executorMemoryLimitQuantity)
441442
.addToRequests("cpu", executorCpuQuantity)
442-
.addToLimits("cpu", executorCpuQuantity)
443443
.endResources()
444444
.addAllToEnv(requiredEnv.asJava)
445445
.addToEnv(executorExtraClasspathEnv.toSeq: _*)
446446
.withPorts(requiredPorts.asJava)
447447
.endContainer()
448448
.endSpec()
449449

450+
executorLimitCores.map {
451+
limitCores =>
452+
val executorCpuLimitQuantity = new QuantityBuilder(false)
453+
.withAmount(limitCores)
454+
.build()
455+
basePodBuilder
456+
.editSpec()
457+
.editFirstContainer()
458+
.editResources
459+
.addToLimits("cpu", executorCpuLimitQuantity)
460+
.endResources()
461+
.endContainer()
462+
.endSpec()
463+
}
464+
450465
val withMaybeShuffleConfigPodBuilder = shuffleServiceConfig
451466
.map { config =>
452467
config.shuffleDirs.foldLeft(basePodBuilder) { (builder, dir) =>

0 commit comments

Comments
 (0)