File tree Expand file tree Collapse file tree 4 files changed +58
-2
lines changed
resource-managers/kubernetes/core/src/main/scala/org/apache/spark
scheduler/cluster/kubernetes Expand file tree Collapse file tree 4 files changed +58
-2
lines changed Original file line number Diff line number Diff line change @@ -718,6 +718,20 @@ from the other deployment modes. See the [configuration page](configuration.html
718718 Docker image pull policy used when pulling Docker images with Kubernetes.
719719 </td >
720720</tr >
721+ <tr >
722+ <td ><code >spark.kubernetes.driver.limit.cores</code ></td >
723+ <td >(none)</td >
724+ <td >
725+ Specify the hard cpu limit for the driver pod
726+ </td >
727+ </tr >
728+ <tr >
729+ <td ><code >spark.kubernetes.executor.limit.cores</code ></td >
730+ <td >(none)</td >
731+ <td >
732+ Specify the hard cpu limit for a single executor pod
733+ </td >
734+ </tr >
721735</table >
722736
723737
Original file line number Diff line number Diff line change @@ -485,6 +485,18 @@ package object config extends Logging {
485485 .stringConf
486486 .createOptional
487487
488+ private [spark] val KUBERNETES_DRIVER_LIMIT_CORES =
489+ ConfigBuilder (" spark.kubernetes.driver.limit.cores" )
490+ .doc(" Specify the hard cpu limit for the driver pod" )
491+ .stringConf
492+ .createOptional
493+
494+ private [spark] val KUBERNETES_EXECUTOR_LIMIT_CORES =
495+ ConfigBuilder (" spark.kubernetes.executor.limit.cores" )
496+ .doc(" Specify the hard cpu limit for a single executor pod" )
497+ .stringConf
498+ .createOptional
499+
488500 private [spark] def resolveK8sMaster (rawMasterString : String ): String = {
489501 if (! rawMasterString.startsWith(" k8s://" )) {
490502 throw new IllegalArgumentException (" Master URL should start with k8s:// in Kubernetes mode." )
Original file line number Diff line number Diff line change @@ -64,6 +64,7 @@ private[spark] class Client(
6464
6565 // CPU settings
6666 private val driverCpuCores = sparkConf.getOption(" spark.driver.cores" ).getOrElse(" 1" )
67+ private val driverLimitCores = sparkConf.getOption(KUBERNETES_DRIVER_LIMIT_CORES .key)
6768
6869 // Memory settings
6970 private val driverMemoryMb = sparkConf.get(org.apache.spark.internal.config.DRIVER_MEMORY )
@@ -139,7 +140,6 @@ private[spark] class Client(
139140 .endEnv()
140141 .withNewResources()
141142 .addToRequests(" cpu" , driverCpuQuantity)
142- .addToLimits(" cpu" , driverCpuQuantity)
143143 .addToRequests(" memory" , driverMemoryQuantity)
144144 .addToLimits(" memory" , driverMemoryLimitQuantity)
145145 .endResources()
@@ -156,6 +156,21 @@ private[spark] class Client(
156156 .addToContainers(driverContainer)
157157 .endSpec()
158158
159+ driverLimitCores.map {
160+ limitCores =>
161+ val driverCpuLimitQuantity = new QuantityBuilder (false )
162+ .withAmount(limitCores)
163+ .build()
164+ basePod
165+ .editSpec()
166+ .editFirstContainer()
167+ .editResources
168+ .addToLimits(" cpu" , driverCpuLimitQuantity)
169+ .endResources()
170+ .endContainer()
171+ .endSpec()
172+ }
173+
159174 val maybeSubmittedResourceIdentifiers = initContainerComponentsProvider
160175 .provideInitContainerSubmittedDependencyUploader(allDriverLabels)
161176 .map { uploader =>
Original file line number Diff line number Diff line change @@ -108,6 +108,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
108108 private val executorMemoryWithOverhead = executorMemoryMb + memoryOverheadMb
109109
110110 private val executorCores = conf.getOption(" spark.executor.cores" ).getOrElse(" 1" )
111+ private val executorLimitCores = conf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES .key)
111112
112113 private implicit val requestExecutorContext = ExecutionContext .fromExecutorService(
113114 ThreadUtils .newDaemonCachedThreadPool(" kubernetes-executor-requests" ))
@@ -438,14 +439,28 @@ private[spark] class KubernetesClusterSchedulerBackend(
438439 .addToRequests(" memory" , executorMemoryQuantity)
439440 .addToLimits(" memory" , executorMemoryLimitQuantity)
440441 .addToRequests(" cpu" , executorCpuQuantity)
441- .addToLimits(" cpu" , executorCpuQuantity)
442442 .endResources()
443443 .addAllToEnv(requiredEnv.asJava)
444444 .addToEnv(executorExtraClasspathEnv.toSeq: _* )
445445 .withPorts(requiredPorts.asJava)
446446 .endContainer()
447447 .endSpec()
448448
449+ executorLimitCores.map {
450+ limitCores =>
451+ val executorCpuLimitQuantity = new QuantityBuilder (false )
452+ .withAmount(limitCores)
453+ .build()
454+ basePodBuilder
455+ .editSpec()
456+ .editFirstContainer()
457+ .editResources
458+ .addToLimits(" cpu" , executorCpuLimitQuantity)
459+ .endResources()
460+ .endContainer()
461+ .endSpec()
462+ }
463+
449464 val withMaybeShuffleConfigPodBuilder = shuffleServiceConfig
450465 .map { config =>
451466 config.shuffleDirs.foldLeft(basePodBuilder) { (builder, dir) =>
You can’t perform that action at this time.
0 commit comments