Skip to content

Commit f6823f3

Browse files
mccheahfoxish
authored andcommitted
Allow setting memory on the driver submission server. (#161)
* Allow setting memory on the driver submission server. * Address comments * Address comments
1 parent 8336465 commit f6823f3

File tree

5 files changed

+80
-13
lines changed

5 files changed

+80
-13
lines changed

docs/running-on-kubernetes.md

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,13 +201,29 @@ from the other deployment modes. See the [configuration page](configuration.html
201201
</tr>
202202
<tr>
203203
<td><code>spark.kubernetes.executor.memoryOverhead</code></td>
204-
<td>executorMemory * 0.10, with minimum of 384 </td>
204+
<td>executorMemory * 0.10, with minimum of 384</td>
205205
<td>
206206
The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things
207207
like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size
208208
(typically 6-10%).
209209
</td>
210210
</tr>
211+
<tr>
212+
<td><code>spark.kubernetes.driver.submissionServerMemory</code></td>
213+
<td>256m</td>
214+
<td>
215+
The amount of memory to allocate for the driver submission server.
216+
</td>
217+
</tr>
218+
<tr>
219+
<td><code>spark.kubernetes.driver.memoryOverhead</code></td>
220+
<td>(driverMemory + driverSubmissionServerMemory) * 0.10, with minimum of 384</td>
221+
<td>
222+
The amount of off-heap memory (in megabytes) to be allocated for the driver and the driver submission server. This
223+
is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to
224+
grow with the driver size (typically 6-10%).
225+
</td>
226+
</tr>
211227
<tr>
212228
<td><code>spark.kubernetes.driver.labels</code></td>
213229
<td>(none)</td>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,19 @@ private[spark] class Client(
6464
.map(_.split(","))
6565
.getOrElse(Array.empty[String])
6666

67+
// Memory settings
68+
private val driverMemoryMb = sparkConf.get(org.apache.spark.internal.config.DRIVER_MEMORY)
69+
private val driverSubmitServerMemoryMb = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY)
70+
private val driverSubmitServerMemoryString = sparkConf.get(
71+
KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY.key,
72+
KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY.defaultValueString)
73+
private val driverContainerMemoryMb = driverMemoryMb + driverSubmitServerMemoryMb
74+
private val memoryOverheadMb = sparkConf
75+
.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD)
76+
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverContainerMemoryMb).toInt,
77+
MEMORY_OVERHEAD_MIN))
78+
private val driverContainerMemoryWithOverhead = driverContainerMemoryMb + memoryOverheadMb
79+
6780
private val waitForAppCompletion: Boolean = sparkConf.get(WAIT_FOR_APP_COMPLETION)
6881

6982
private val secretBase64String = {
@@ -408,6 +421,12 @@ private[spark] class Client(
408421
.withPath("/v1/submissions/ping")
409422
.withNewPort(SUBMISSION_SERVER_PORT_NAME)
410423
.build()
424+
val driverMemoryQuantity = new QuantityBuilder(false)
425+
.withAmount(s"${driverContainerMemoryMb}M")
426+
.build()
427+
val driverMemoryLimitQuantity = new QuantityBuilder(false)
428+
.withAmount(s"${driverContainerMemoryWithOverhead}M")
429+
.build()
411430
val driverPod = kubernetesClient.pods().createNew()
412431
.withNewMetadata()
413432
.withName(kubernetesAppId)
@@ -442,7 +461,16 @@ private[spark] class Client(
442461
.withName(ENV_SUBMISSION_SERVER_PORT)
443462
.withValue(SUBMISSION_SERVER_PORT.toString)
444463
.endEnv()
464+
// Note that SPARK_DRIVER_MEMORY only affects the REST server via spark-class.
465+
.addNewEnv()
466+
.withName(ENV_DRIVER_MEMORY)
467+
.withValue(driverSubmitServerMemoryString)
468+
.endEnv()
445469
.addToEnv(sslConfiguration.sslPodEnvVars: _*)
470+
.withNewResources()
471+
.addToRequests("memory", driverMemoryQuantity)
472+
.addToLimits("memory", driverMemoryLimitQuantity)
473+
.endResources()
446474
.withPorts(containerPorts.asJava)
447475
.withNewReadinessProbe().withHttpGet(probePingHttpGet).endReadinessProbe()
448476
.endContainer()

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit
2121
import org.apache.spark.{SPARK_VERSION => sparkVersion}
2222
import org.apache.spark.deploy.rest.kubernetes.NodePortUrisDriverServiceManager
2323
import org.apache.spark.internal.config.ConfigBuilder
24+
import org.apache.spark.network.util.ByteUnit
2425

2526
package object config {
2627

@@ -104,7 +105,19 @@ package object config {
104105
| overheads, etc. This tends to grow with the executor size
105106
| (typically 6-10%).
106107
""".stripMargin)
107-
.stringConf
108+
.bytesConf(ByteUnit.MiB)
109+
.createOptional
110+
111+
private[spark] val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
112+
ConfigBuilder("spark.kubernetes.driver.memoryOverhead")
113+
.doc("""
114+
| The amount of off-heap memory (in megabytes) to be
115+
| allocated for the driver and the driver submission server.
116+
| This is memory that accounts for things like VM overheads,
117+
| interned strings, other native overheads, etc. This tends
118+
| to grow with the driver's memory size (typically 6-10%).
119+
""".stripMargin)
120+
.bytesConf(ByteUnit.MiB)
108121
.createOptional
109122

110123
private[spark] val KUBERNETES_DRIVER_LABELS =
@@ -177,6 +190,14 @@ package object config {
177190
.stringConf
178191
.createOptional
179192

193+
private[spark] val KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY =
194+
ConfigBuilder("spark.kubernetes.driver.submissionServerMemory")
195+
.doc("""
196+
| The amount of memory to allocate for the driver submission server.
197+
""".stripMargin)
198+
.bytesConf(ByteUnit.MiB)
199+
.createWithDefaultString("256m")
200+
180201
private[spark] val EXPOSE_KUBERNETES_DRIVER_SERVICE_UI_PORT =
181202
ConfigBuilder("spark.kubernetes.driver.service.exposeUiPort")
182203
.doc("""

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ package object constants {
6363
private[spark] val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY"
6464
private[spark] val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID"
6565
private[spark] val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID"
66+
private[spark] val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY"
6667

6768
// Annotation keys
6869
private[spark] val ANNOTATION_PROVIDE_EXTERNAL_URI =
@@ -74,4 +75,6 @@ package object constants {
7475
private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
7576
private[spark] val KUBERNETES_SUBMIT_SSL_NAMESPACE = "kubernetes.submit"
7677
private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
78+
private[spark] val MEMORY_OVERHEAD_FACTOR = 0.10
79+
private[spark] val MEMORY_OVERHEAD_MIN = 384L
7780
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,16 @@ private[spark] class KubernetesClusterSchedulerBackend(
6060
.getOrElse(
6161
throw new SparkException("Must specify the driver pod name"))
6262

63-
private val executorMemory = conf.getOption("spark.executor.memory").getOrElse("1g")
64-
private val executorMemoryBytes = Utils.byteStringAsBytes(executorMemory)
63+
private val executorMemoryMb = conf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY)
64+
private val executorMemoryString = conf.get(
65+
org.apache.spark.internal.config.EXECUTOR_MEMORY.key,
66+
org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString)
6567

66-
private val memoryOverheadBytes = conf
68+
private val memoryOverheadMb = conf
6769
.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD)
68-
.map(overhead => Utils.byteStringAsBytes(overhead))
69-
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryBytes).toInt,
70+
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMb).toInt,
7071
MEMORY_OVERHEAD_MIN))
71-
private val executorMemoryWithOverhead = executorMemoryBytes + memoryOverheadBytes
72+
private val executorMemoryWithOverhead = executorMemoryMb + memoryOverheadMb
7273

7374
private val executorCores = conf.getOption("spark.executor.cores").getOrElse("1")
7475

@@ -165,10 +166,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
165166
val selectors = Map(SPARK_EXECUTOR_ID_LABEL -> executorId,
166167
SPARK_APP_ID_LABEL -> applicationId()).asJava
167168
val executorMemoryQuantity = new QuantityBuilder(false)
168-
.withAmount(executorMemoryBytes.toString)
169+
.withAmount(s"${executorMemoryMb}M")
169170
.build()
170171
val executorMemoryLimitQuantity = new QuantityBuilder(false)
171-
.withAmount(executorMemoryWithOverhead.toString)
172+
.withAmount(s"${executorMemoryWithOverhead}M")
172173
.build()
173174
val executorCpuQuantity = new QuantityBuilder(false)
174175
.withAmount(executorCores)
@@ -177,7 +178,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
177178
(ENV_EXECUTOR_PORT, executorPort.toString),
178179
(ENV_DRIVER_URL, driverUrl),
179180
(ENV_EXECUTOR_CORES, executorCores),
180-
(ENV_EXECUTOR_MEMORY, executorMemory),
181+
(ENV_EXECUTOR_MEMORY, executorMemoryString),
181182
(ENV_APPLICATION_ID, applicationId()),
182183
(ENV_EXECUTOR_ID, executorId)
183184
).map(env => new EnvVarBuilder()
@@ -261,7 +262,5 @@ private[spark] class KubernetesClusterSchedulerBackend(
261262

262263
private object KubernetesClusterSchedulerBackend {
263264
private val DEFAULT_STATIC_PORT = 10000
264-
private val MEMORY_OVERHEAD_FACTOR = 0.10
265-
private val MEMORY_OVERHEAD_MIN = 384L
266265
private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
267266
}

0 commit comments

Comments
 (0)