Skip to content

Commit 3df307a

Browse files
NiharSMarcelo Vanzin
authored and
Marcelo Vanzin
committed
[SPARK-25960][K8S] Support subpath mounting with Kubernetes
## What changes were proposed in this pull request? This PR adds configurations to use subpaths with Spark on k8s. Subpaths (https://kubernetes.io/docs/concepts/storage/volumes/#using-subpath) allow the user to specify a path within a volume to use instead of the volume's root. ## How was this patch tested? Added unit tests. Ran SparkPi on a cluster with event logging pointed at a subpath-mount and verified the driver host created and used the subpath. Closes #23026 from NiharS/k8s_subpath. Authored-by: Nihar Sheth <niharrsheth@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
1 parent 2512a1d commit 3df307a

File tree

9 files changed

+148
-0
lines changed

9 files changed

+148
-0
lines changed

docs/running-on-kubernetes.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ To mount a volume of any of the types above into the driver pod, use the followi
245245
```
246246
--conf spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.path=<mount path>
247247
--conf spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.readOnly=<true|false>
248+
--conf spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.subPath=<mount subPath>
248249
```
249250

250251
Specifically, `VolumeType` can be one of the following values: `hostPath`, `emptyDir`, and `persistentVolumeClaim`. `VolumeName` is the name you want to use for the volume under the `volumes` field in the pod specification.
@@ -806,6 +807,14 @@ specific to Spark on Kubernetes.
806807
<code>spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.path=/checkpoint</code>.
807808
</td>
808809
</tr>
810+
<tr>
811+
<td><code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.subPath</code></td>
812+
<td>(none)</td>
813+
<td>
814+
Specifies a <a href="https://kubernetes.io/docs/concepts/storage/volumes/#using-subpath">subpath</a> to be mounted from the volume into the driver pod.
815+
<code>spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.subPath=checkpoint</code>.
816+
</td>
817+
</tr>
809818
<tr>
810819
<td><code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.readOnly</code></td>
811820
<td>(none)</td>
@@ -830,6 +839,14 @@ specific to Spark on Kubernetes.
830839
<code>spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.path=/checkpoint</code>.
831840
</td>
832841
</tr>
842+
<tr>
843+
<td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.subPath</code></td>
844+
<td>(none)</td>
845+
<td>
846+
Specifies a <a href="https://kubernetes.io/docs/concepts/storage/volumes/#using-subpath">subpath</a> to be mounted from the volume into the executor pod.
847+
<code>spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.subPath=checkpoint</code>.
848+
</td>
849+
</tr>
833850
<tr>
834851
<td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.readOnly</code></td>
835852
<td>false</td>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ private[spark] object Config extends Logging {
297297
val KUBERNETES_VOLUMES_PVC_TYPE = "persistentVolumeClaim"
298298
val KUBERNETES_VOLUMES_EMPTYDIR_TYPE = "emptyDir"
299299
val KUBERNETES_VOLUMES_MOUNT_PATH_KEY = "mount.path"
300+
val KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY = "mount.subPath"
300301
val KUBERNETES_VOLUMES_MOUNT_READONLY_KEY = "mount.readOnly"
301302
val KUBERNETES_VOLUMES_OPTIONS_PATH_KEY = "options.path"
302303
val KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY = "options.claimName"

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,6 @@ private[spark] case class KubernetesEmptyDirVolumeConf(
3434
private[spark] case class KubernetesVolumeSpec[T <: KubernetesVolumeSpecificConf](
3535
volumeName: String,
3636
mountPath: String,
37+
mountSubPath: String,
3738
mountReadOnly: Boolean,
3839
volumeConf: T)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,15 @@ private[spark] object KubernetesVolumeUtils {
3939
getVolumeTypesAndNames(properties).map { case (volumeType, volumeName) =>
4040
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_PATH_KEY"
4141
val readOnlyKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_READONLY_KEY"
42+
val subPathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY"
4243

4344
for {
4445
path <- properties.getTry(pathKey)
4546
volumeConf <- parseVolumeSpecificConf(properties, volumeType, volumeName)
4647
} yield KubernetesVolumeSpec(
4748
volumeName = volumeName,
4849
mountPath = path,
50+
mountSubPath = properties.get(subPathKey).getOrElse(""),
4951
mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean),
5052
volumeConf = volumeConf
5153
)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ private[spark] class MountVolumesFeatureStep(
5151
val volumeMount = new VolumeMountBuilder()
5252
.withMountPath(spec.mountPath)
5353
.withReadOnly(spec.mountReadOnly)
54+
.withSubPath(spec.mountSubPath)
5455
.withName(spec.volumeName)
5556
.build()
5657

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,18 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
3333
KubernetesHostPathVolumeConf("/hostPath"))
3434
}
3535

36+
test("Parses subPath correctly") {
37+
val sparkConf = new SparkConf(false)
38+
sparkConf.set("test.emptyDir.volumeName.mount.path", "/path")
39+
sparkConf.set("test.emptyDir.volumeName.mount.readOnly", "true")
40+
sparkConf.set("test.emptyDir.volumeName.mount.subPath", "subPath")
41+
42+
val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head.get
43+
assert(volumeSpec.volumeName === "volumeName")
44+
assert(volumeSpec.mountPath === "/path")
45+
assert(volumeSpec.mountSubPath === "subPath")
46+
}
47+
3648
test("Parses persistentVolumeClaim volumes correctly") {
3749
val sparkConf = new SparkConf(false)
3850
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path")

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
4343
val volumeConf = KubernetesVolumeSpec(
4444
"testVolume",
4545
"/tmp",
46+
"",
4647
false,
4748
KubernetesHostPathVolumeConf("/hostPath/tmp")
4849
)
@@ -62,6 +63,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
6263
val volumeConf = KubernetesVolumeSpec(
6364
"testVolume",
6465
"/tmp",
66+
"",
6567
true,
6668
KubernetesPVCVolumeConf("pvcClaim")
6769
)
@@ -83,6 +85,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
8385
val volumeConf = KubernetesVolumeSpec(
8486
"testVolume",
8587
"/tmp",
88+
"",
8689
false,
8790
KubernetesEmptyDirVolumeConf(Some("Memory"), Some("6G"))
8891
)
@@ -104,6 +107,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
104107
val volumeConf = KubernetesVolumeSpec(
105108
"testVolume",
106109
"/tmp",
110+
"",
107111
false,
108112
KubernetesEmptyDirVolumeConf(None, None)
109113
)
@@ -125,12 +129,14 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
125129
val hpVolumeConf = KubernetesVolumeSpec(
126130
"hpVolume",
127131
"/tmp",
132+
"",
128133
false,
129134
KubernetesHostPathVolumeConf("/hostPath/tmp")
130135
)
131136
val pvcVolumeConf = KubernetesVolumeSpec(
132137
"checkpointVolume",
133138
"/checkpoints",
139+
"",
134140
true,
135141
KubernetesPVCVolumeConf("pvcClaim")
136142
)
@@ -142,4 +148,77 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
142148
assert(configuredPod.pod.getSpec.getVolumes.size() === 2)
143149
assert(configuredPod.container.getVolumeMounts.size() === 2)
144150
}
151+
152+
test("Mounts subpath on emptyDir") {
153+
val volumeConf = KubernetesVolumeSpec(
154+
"testVolume",
155+
"/tmp",
156+
"foo",
157+
false,
158+
KubernetesEmptyDirVolumeConf(None, None)
159+
)
160+
val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil)
161+
val step = new MountVolumesFeatureStep(kubernetesConf)
162+
val configuredPod = step.configurePod(SparkPod.initialPod())
163+
164+
assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
165+
val emptyDirMount = configuredPod.container.getVolumeMounts.get(0)
166+
assert(emptyDirMount.getMountPath === "/tmp")
167+
assert(emptyDirMount.getName === "testVolume")
168+
assert(emptyDirMount.getSubPath === "foo")
169+
}
170+
171+
test("Mounts subpath on persistentVolumeClaims") {
172+
val volumeConf = KubernetesVolumeSpec(
173+
"testVolume",
174+
"/tmp",
175+
"bar",
176+
true,
177+
KubernetesPVCVolumeConf("pvcClaim")
178+
)
179+
val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil)
180+
val step = new MountVolumesFeatureStep(kubernetesConf)
181+
val configuredPod = step.configurePod(SparkPod.initialPod())
182+
183+
assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
184+
val pvcClaim = configuredPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
185+
assert(pvcClaim.getClaimName === "pvcClaim")
186+
assert(configuredPod.container.getVolumeMounts.size() === 1)
187+
val pvcMount = configuredPod.container.getVolumeMounts.get(0)
188+
assert(pvcMount.getMountPath === "/tmp")
189+
assert(pvcMount.getName === "testVolume")
190+
assert(pvcMount.getSubPath === "bar")
191+
}
192+
193+
test("Mounts multiple subpaths") {
194+
val volumeConf = KubernetesEmptyDirVolumeConf(None, None)
195+
val emptyDirSpec = KubernetesVolumeSpec(
196+
"testEmptyDir",
197+
"/tmp/foo",
198+
"foo",
199+
true,
200+
KubernetesEmptyDirVolumeConf(None, None)
201+
)
202+
val pvcSpec = KubernetesVolumeSpec(
203+
"testPVC",
204+
"/tmp/bar",
205+
"bar",
206+
true,
207+
KubernetesEmptyDirVolumeConf(None, None)
208+
)
209+
val kubernetesConf = emptyKubernetesConf.copy(
210+
roleVolumes = emptyDirSpec :: pvcSpec :: Nil)
211+
val step = new MountVolumesFeatureStep(kubernetesConf)
212+
val configuredPod = step.configurePod(SparkPod.initialPod())
213+
214+
assert(configuredPod.pod.getSpec.getVolumes.size() === 2)
215+
val mounts = configuredPod.container.getVolumeMounts
216+
assert(mounts.size() === 2)
217+
assert(mounts.get(0).getName === "testEmptyDir")
218+
assert(mounts.get(0).getMountPath === "/tmp/foo")
219+
assert(mounts.get(0).getSubPath === "foo")
220+
assert(mounts.get(1).getName === "testPVC")
221+
assert(mounts.get(1).getMountPath === "/tmp/bar")
222+
assert(mounts.get(1).getSubPath === "bar")
223+
}
145224
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,40 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
140140
val volumeSpec = KubernetesVolumeSpec(
141141
"volume",
142142
"/tmp",
143+
"",
144+
false,
145+
KubernetesHostPathVolumeConf("/path"))
146+
val conf = KubernetesConf(
147+
new SparkConf(false),
148+
KubernetesDriverSpecificConf(
149+
JavaMainAppResource(None),
150+
"test-app",
151+
"main",
152+
Seq.empty),
153+
"prefix",
154+
"appId",
155+
Map.empty,
156+
Map.empty,
157+
Map.empty,
158+
Map.empty,
159+
Map.empty,
160+
volumeSpec :: Nil,
161+
hadoopConfSpec = None)
162+
validateStepTypesApplied(
163+
builderUnderTest.buildFromFeatures(conf),
164+
BASIC_STEP_TYPE,
165+
CREDENTIALS_STEP_TYPE,
166+
SERVICE_STEP_TYPE,
167+
LOCAL_DIRS_STEP_TYPE,
168+
MOUNT_VOLUMES_STEP_TYPE,
169+
DRIVER_CMD_STEP_TYPE)
170+
}
171+
172+
test("Apply volumes step if a mount subpath is present.") {
173+
val volumeSpec = KubernetesVolumeSpec(
174+
"volume",
175+
"/tmp",
176+
"foo",
143177
false,
144178
KubernetesHostPathVolumeConf("/path"))
145179
val conf = KubernetesConf(

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
107107
val volumeSpec = KubernetesVolumeSpec(
108108
"volume",
109109
"/tmp",
110+
"",
110111
false,
111112
KubernetesHostPathVolumeConf("/checkpoint"))
112113
val conf = KubernetesConf(

0 commit comments

Comments
 (0)