Skip to content

Commit 5ff1b9b

Browse files
Andrew KorzhuevFelix Cheung
Andrew Korzhuev
authored and
Felix Cheung
committed
[SPARK-23529][K8S] Support mounting volumes
This PR continues #21095 and intersects with #21238. I've added volume mounts as a separate step and added PersistantVolumeClaim support. There is a fundamental problem with how we pass the options through spark conf to fabric8. For each volume type and all possible volume options we would have to implement some custom code to map config values to fabric8 calls. This will result in big body of code we would have to support and means that Spark will always be somehow out of sync with k8s. I think there needs to be a discussion on how to proceed correctly (eg use PodPreset instead) ---- Due to the complications of provisioning and managing actual resources this PR addresses only volume mounting of already present resources. ---- - [x] emptyDir support - [x] Testing - [x] Documentation - [x] KubernetesVolumeUtils tests Author: Andrew Korzhuev <andrew.korzhuev@klarna.com> Author: madanadit <adit@alluxio.com> Closes #21260 from andrusha/k8s-vol.
1 parent 74a8d63 commit 5ff1b9b

25 files changed

+705
-51
lines changed

docs/running-on-kubernetes.md

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,54 @@ specific to Spark on Kubernetes.
629629
Add as an environment variable to the executor container with name EnvName (case sensitive), the value referenced by key <code> key </code> in the data of the referenced <a href="https://kubernetes.io/docs/concepts/configuration/secret/#using-secrets-as-environment-variables">Kubernetes Secret</a>. For example,
630630
<code>spark.kubernetes.executor.secrets.ENV_VAR=spark-secret:key</code>.
631631
</td>
632+
</tr>
633+
<tr>
634+
<td><code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.path</code></td>
635+
<td>(none)</td>
636+
<td>
637+
Add the <a href="https://kubernetes.io/docs/concepts/storage/volumes/">Kubernetes Volume</a> named <code>VolumeName</code> of the <code>VolumeType</code> type to the driver pod on the path specified in the value. For example,
638+
<code>spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.path=/checkpoint</code>.
639+
</td>
640+
</tr>
641+
<tr>
642+
<td><code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.readOnly</code></td>
643+
<td>(none)</td>
644+
<td>
645+
Specify if the mounted volume is read only or not. For example,
646+
<code>spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.readOnly=false</code>.
647+
</td>
648+
</tr>
649+
<tr>
650+
<td><code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].options.[OptionName]</code></td>
651+
<td>(none)</td>
652+
<td>
653+
Configure <a href="https://kubernetes.io/docs/concepts/storage/volumes/">Kubernetes Volume</a> options passed to the Kubernetes with <code>OptionName</code> as key having specified value, must conform with Kubernetes option format. For example,
654+
<code>spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=spark-pvc-claim</code>.
655+
</td>
656+
</tr>
657+
<tr>
658+
<td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.path</code></td>
659+
<td>(none)</td>
660+
<td>
661+
Add the <a href="https://kubernetes.io/docs/concepts/storage/volumes/">Kubernetes Volume</a> named <code>VolumeName</code> of the <code>VolumeType</code> type to the executor pod on the path specified in the value. For example,
662+
<code>spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.path=/checkpoint</code>.
663+
</td>
664+
</tr>
665+
<tr>
666+
<td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.readOnly</code></td>
667+
<td>false</td>
668+
<td>
669+
Specify if the mounted volume is read only or not. For example,
670+
<code>spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.readOnly=false</code>.
671+
</td>
672+
</tr>
673+
<tr>
674+
<td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].options.[OptionName]</code></td>
675+
<td>(none)</td>
676+
<td>
677+
Configure <a href="https://kubernetes.io/docs/concepts/storage/volumes/">Kubernetes Volume</a> options passed to the Kubernetes with <code>OptionName</code> as key having specified value. For example,
678+
<code>spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=spark-pvc-claim</code>.
679+
</td>
632680
</tr>
633681
<tr>
634682
<td><code>spark.kubernetes.memoryOverheadFactor</code></td>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,11 +220,23 @@ private[spark] object Config extends Logging {
220220
val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
221221
val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."
222222
val KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX = "spark.kubernetes.driver.secretKeyRef."
223+
val KUBERNETES_DRIVER_VOLUMES_PREFIX = "spark.kubernetes.driver.volumes."
223224

224225
val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
225226
val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
226227
val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets."
227228
val KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX = "spark.kubernetes.executor.secretKeyRef."
229+
val KUBERNETES_EXECUTOR_VOLUMES_PREFIX = "spark.kubernetes.executor.volumes."
230+
231+
val KUBERNETES_VOLUMES_HOSTPATH_TYPE = "hostPath"
232+
val KUBERNETES_VOLUMES_PVC_TYPE = "persistentVolumeClaim"
233+
val KUBERNETES_VOLUMES_EMPTYDIR_TYPE = "emptyDir"
234+
val KUBERNETES_VOLUMES_MOUNT_PATH_KEY = "mount.path"
235+
val KUBERNETES_VOLUMES_MOUNT_READONLY_KEY = "mount.readOnly"
236+
val KUBERNETES_VOLUMES_OPTIONS_PATH_KEY = "options.path"
237+
val KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY = "options.claimName"
238+
val KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY = "options.medium"
239+
val KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY = "options.sizeLimit"
228240

229241
val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv."
230242
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
5959
roleSecretNamesToMountPaths: Map[String, String],
6060
roleSecretEnvNamesToKeyRefs: Map[String, String],
6161
roleEnvs: Map[String, String],
62+
roleVolumes: Iterable[KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf]],
6263
sparkFiles: Seq[String]) {
6364

6465
def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
@@ -155,6 +156,12 @@ private[spark] object KubernetesConf {
155156
sparkConf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX)
156157
val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs(
157158
sparkConf, KUBERNETES_DRIVER_ENV_PREFIX)
159+
val driverVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix(
160+
sparkConf, KUBERNETES_DRIVER_VOLUMES_PREFIX).map(_.get)
161+
// Also parse executor volumes in order to verify configuration
162+
// before the driver pod is created
163+
KubernetesVolumeUtils.parseVolumesWithPrefix(
164+
sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get)
158165

159166
val sparkFiles = sparkConf
160167
.getOption("spark.files")
@@ -171,6 +178,7 @@ private[spark] object KubernetesConf {
171178
driverSecretNamesToMountPaths,
172179
driverSecretEnvNamesToKeyRefs,
173180
driverEnvs,
181+
driverVolumes,
174182
sparkFiles)
175183
}
176184

@@ -203,6 +211,8 @@ private[spark] object KubernetesConf {
203211
val executorEnvSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
204212
sparkConf, KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX)
205213
val executorEnv = sparkConf.getExecutorEnv.toMap
214+
val executorVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix(
215+
sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get)
206216

207217
KubernetesConf(
208218
sparkConf.clone(),
@@ -214,6 +224,7 @@ private[spark] object KubernetesConf {
214224
executorMountSecrets,
215225
executorEnvSecrets,
216226
executorEnv,
227+
executorVolumes,
217228
Seq.empty[String])
218229
}
219230
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
*/
1717
package org.apache.spark.deploy.k8s
1818

19-
import io.fabric8.kubernetes.api.model.LocalObjectReference
20-
2119
import org.apache.spark.SparkConf
2220
import org.apache.spark.util.Utils
2321

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s
18+
19+
private[spark] sealed trait KubernetesVolumeSpecificConf
20+
21+
private[spark] case class KubernetesHostPathVolumeConf(
22+
hostPath: String)
23+
extends KubernetesVolumeSpecificConf
24+
25+
private[spark] case class KubernetesPVCVolumeConf(
26+
claimName: String)
27+
extends KubernetesVolumeSpecificConf
28+
29+
private[spark] case class KubernetesEmptyDirVolumeConf(
30+
medium: Option[String],
31+
sizeLimit: Option[String])
32+
extends KubernetesVolumeSpecificConf
33+
34+
private[spark] case class KubernetesVolumeSpec[T <: KubernetesVolumeSpecificConf](
35+
volumeName: String,
36+
mountPath: String,
37+
mountReadOnly: Boolean,
38+
volumeConf: T)
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s
18+
19+
import java.util.NoSuchElementException
20+
21+
import scala.util.{Failure, Success, Try}
22+
23+
import org.apache.spark.SparkConf
24+
import org.apache.spark.deploy.k8s.Config._
25+
26+
private[spark] object KubernetesVolumeUtils {
27+
/**
28+
* Extract Spark volume configuration properties with a given name prefix.
29+
*
30+
* @param sparkConf Spark configuration
31+
* @param prefix the given property name prefix
32+
* @return a Map storing with volume name as key and spec as value
33+
*/
34+
def parseVolumesWithPrefix(
35+
sparkConf: SparkConf,
36+
prefix: String): Iterable[Try[KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf]]] = {
37+
val properties = sparkConf.getAllWithPrefix(prefix).toMap
38+
39+
getVolumeTypesAndNames(properties).map { case (volumeType, volumeName) =>
40+
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_PATH_KEY"
41+
val readOnlyKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_READONLY_KEY"
42+
43+
for {
44+
path <- properties.getTry(pathKey)
45+
volumeConf <- parseVolumeSpecificConf(properties, volumeType, volumeName)
46+
} yield KubernetesVolumeSpec(
47+
volumeName = volumeName,
48+
mountPath = path,
49+
mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean),
50+
volumeConf = volumeConf
51+
)
52+
}
53+
}
54+
55+
/**
56+
* Get unique pairs of volumeType and volumeName,
57+
* assuming options are formatted in this way:
58+
* `volumeType`.`volumeName`.`property` = `value`
59+
* @param properties flat mapping of property names to values
60+
* @return Set[(volumeType, volumeName)]
61+
*/
62+
private def getVolumeTypesAndNames(
63+
properties: Map[String, String]
64+
): Set[(String, String)] = {
65+
properties.keys.flatMap { k =>
66+
k.split('.').toList match {
67+
case tpe :: name :: _ => Some((tpe, name))
68+
case _ => None
69+
}
70+
}.toSet
71+
}
72+
73+
private def parseVolumeSpecificConf(
74+
options: Map[String, String],
75+
volumeType: String,
76+
volumeName: String): Try[KubernetesVolumeSpecificConf] = {
77+
volumeType match {
78+
case KUBERNETES_VOLUMES_HOSTPATH_TYPE =>
79+
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY"
80+
for {
81+
path <- options.getTry(pathKey)
82+
} yield KubernetesHostPathVolumeConf(path)
83+
84+
case KUBERNETES_VOLUMES_PVC_TYPE =>
85+
val claimNameKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY"
86+
for {
87+
claimName <- options.getTry(claimNameKey)
88+
} yield KubernetesPVCVolumeConf(claimName)
89+
90+
case KUBERNETES_VOLUMES_EMPTYDIR_TYPE =>
91+
val mediumKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY"
92+
val sizeLimitKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY"
93+
Success(KubernetesEmptyDirVolumeConf(options.get(mediumKey), options.get(sizeLimitKey)))
94+
95+
case _ =>
96+
Failure(new RuntimeException(s"Kubernetes Volume type `$volumeType` is not supported"))
97+
}
98+
}
99+
100+
/**
101+
* Convenience wrapper to accumulate key lookup errors
102+
*/
103+
implicit private class MapOps[A, B](m: Map[A, B]) {
104+
def getTry(key: A): Try[B] = {
105+
m
106+
.get(key)
107+
.fold[Try[B]](Failure(new NoSuchElementException(key.toString)))(Success(_))
108+
}
109+
}
110+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ package org.apache.spark.deploy.k8s.features
1919
import scala.collection.JavaConverters._
2020
import scala.collection.mutable
2121

22-
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, PodBuilder, QuantityBuilder}
22+
import io.fabric8.kubernetes.api.model._
2323

2424
import org.apache.spark.SparkException
25-
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod}
25+
import org.apache.spark.deploy.k8s._
2626
import org.apache.spark.deploy.k8s.Config._
2727
import org.apache.spark.deploy.k8s.Constants._
2828
import org.apache.spark.deploy.k8s.submit._
@@ -103,6 +103,7 @@ private[spark] class BasicDriverFeatureStep(
103103
.addToImagePullSecrets(conf.imagePullSecrets(): _*)
104104
.endSpec()
105105
.build()
106+
106107
SparkPod(driverPod, driverContainer)
107108
}
108109

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ package org.apache.spark.deploy.k8s.features
1818

1919
import scala.collection.JavaConverters._
2020

21-
import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, PodBuilder, QuantityBuilder}
21+
import io.fabric8.kubernetes.api.model._
2222

2323
import org.apache.spark.SparkException
24-
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod}
24+
import org.apache.spark.deploy.k8s._
2525
import org.apache.spark.deploy.k8s.Config._
2626
import org.apache.spark.deploy.k8s.Constants._
2727
import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
@@ -173,6 +173,7 @@ private[spark] class BasicExecutorFeatureStep(
173173
.addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*)
174174
.endSpec()
175175
.build()
176+
176177
SparkPod(executorPod, containerWithLimitCores)
177178
}
178179

0 commit comments

Comments
 (0)