Skip to content

[SPARK-25960][k8s] Support subpath mounting with Kubernetes #23026

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ To mount a volume of any of the types above into the driver pod, use the followi
```
--conf spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.path=<mount path>
--conf spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.readOnly=<true|false>
--conf spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.subPath=<mount subPath>
```

Specifically, `VolumeType` can be one of the following values: `hostPath`, `emptyDir`, and `persistentVolumeClaim`. `VolumeName` is the name you want to use for the volume under the `volumes` field in the pod specification.
Expand Down Expand Up @@ -780,6 +781,14 @@ specific to Spark on Kubernetes.
<code>spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.path=/checkpoint</code>.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.subPath</code></td>
<td>(none)</td>
<td>
Specifies a <a href="https://kubernetes.io/docs/concepts/storage/volumes/#using-subpath">subpath</a> to be mounted from the volume into the driver pod.
<code>spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.subPath=checkpoint</code>.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.readOnly</code></td>
<td>(none)</td>
Expand All @@ -804,6 +813,14 @@ specific to Spark on Kubernetes.
<code>spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.path=/checkpoint</code>.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.subPath</code></td>
<td>(none)</td>
<td>
Specifies a <a href="https://kubernetes.io/docs/concepts/storage/volumes/#using-subpath">subpath</a> to be mounted from the volume into the executor pod.
<code>spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.subPath=checkpoint</code>.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.readOnly</code></td>
<td>false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ private[spark] object Config extends Logging {
val KUBERNETES_VOLUMES_PVC_TYPE = "persistentVolumeClaim"
val KUBERNETES_VOLUMES_EMPTYDIR_TYPE = "emptyDir"
val KUBERNETES_VOLUMES_MOUNT_PATH_KEY = "mount.path"
val KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY = "mount.subPath"
val KUBERNETES_VOLUMES_MOUNT_READONLY_KEY = "mount.readOnly"
val KUBERNETES_VOLUMES_OPTIONS_PATH_KEY = "options.path"
val KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY = "options.claimName"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ private[spark] case class KubernetesEmptyDirVolumeConf(
private[spark] case class KubernetesVolumeSpec[T <: KubernetesVolumeSpecificConf](
volumeName: String,
mountPath: String,
mountSubPath: String,
mountReadOnly: Boolean,
volumeConf: T)
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ private[spark] object KubernetesVolumeUtils {
getVolumeTypesAndNames(properties).map { case (volumeType, volumeName) =>
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_PATH_KEY"
val readOnlyKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_READONLY_KEY"
val subPathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY"

for {
path <- properties.getTry(pathKey)
volumeConf <- parseVolumeSpecificConf(properties, volumeType, volumeName)
} yield KubernetesVolumeSpec(
volumeName = volumeName,
mountPath = path,
mountSubPath = properties.get(subPathKey).getOrElse(""),
mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean),
volumeConf = volumeConf
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ private[spark] class MountVolumesFeatureStep(
val volumeMount = new VolumeMountBuilder()
.withMountPath(spec.mountPath)
.withReadOnly(spec.mountReadOnly)
.withSubPath(spec.mountSubPath)
.withName(spec.volumeName)
.build()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
KubernetesHostPathVolumeConf("/hostPath"))
}

test("Parses subPath correctly") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.emptyDir.volumeName.mount.path", "/path")
sparkConf.set("test.emptyDir.volumeName.mount.readOnly", "true")
sparkConf.set("test.emptyDir.volumeName.mount.subPath", "subPath")

val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head.get
assert(volumeSpec.volumeName === "volumeName")
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountSubPath === "subPath")
}

test("Parses persistentVolumeClaim volumes correctly") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
"/tmp",
"",
false,
KubernetesHostPathVolumeConf("/hostPath/tmp")
)
Expand All @@ -62,6 +63,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
"/tmp",
"",
true,
KubernetesPVCVolumeConf("pvcClaim")
)
Expand All @@ -83,6 +85,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
"/tmp",
"",
false,
KubernetesEmptyDirVolumeConf(Some("Memory"), Some("6G"))
)
Expand All @@ -104,6 +107,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
"/tmp",
"",
false,
KubernetesEmptyDirVolumeConf(None, None)
)
Expand All @@ -125,12 +129,14 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
val hpVolumeConf = KubernetesVolumeSpec(
"hpVolume",
"/tmp",
"",
false,
KubernetesHostPathVolumeConf("/hostPath/tmp")
)
val pvcVolumeConf = KubernetesVolumeSpec(
"checkpointVolume",
"/checkpoints",
"",
true,
KubernetesPVCVolumeConf("pvcClaim")
)
Expand All @@ -142,4 +148,77 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
assert(configuredPod.pod.getSpec.getVolumes.size() === 2)
assert(configuredPod.container.getVolumeMounts.size() === 2)
}

test("Mounts subpath on emptyDir") {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
"/tmp",
"foo",
false,
KubernetesEmptyDirVolumeConf(None, None)
)
val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil)
val step = new MountVolumesFeatureStep(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())

assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
val emptyDirMount = configuredPod.container.getVolumeMounts.get(0)
assert(emptyDirMount.getMountPath === "/tmp")
assert(emptyDirMount.getName === "testVolume")
assert(emptyDirMount.getSubPath === "foo")
}

test("Mounts subpath on persistentVolumeClaims") {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
"/tmp",
"bar",
true,
KubernetesPVCVolumeConf("pvcClaim")
)
val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil)
val step = new MountVolumesFeatureStep(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())

assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
val pvcClaim = configuredPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
assert(pvcClaim.getClaimName === "pvcClaim")
assert(configuredPod.container.getVolumeMounts.size() === 1)
val pvcMount = configuredPod.container.getVolumeMounts.get(0)
assert(pvcMount.getMountPath === "/tmp")
assert(pvcMount.getName === "testVolume")
assert(pvcMount.getSubPath === "bar")
}

test("Mounts multiple subpaths") {
val volumeConf = KubernetesEmptyDirVolumeConf(None, None)
val emptyDirSpec = KubernetesVolumeSpec(
"testEmptyDir",
"/tmp/foo",
"foo",
true,
KubernetesEmptyDirVolumeConf(None, None)
)
val pvcSpec = KubernetesVolumeSpec(
"testPVC",
"/tmp/bar",
"bar",
true,
KubernetesEmptyDirVolumeConf(None, None)
)
val kubernetesConf = emptyKubernetesConf.copy(
roleVolumes = emptyDirSpec :: pvcSpec :: Nil)
val step = new MountVolumesFeatureStep(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())

assert(configuredPod.pod.getSpec.getVolumes.size() === 2)
val mounts = configuredPod.container.getVolumeMounts
assert(mounts.size() === 2)
assert(mounts.get(0).getName === "testEmptyDir")
assert(mounts.get(0).getMountPath === "/tmp/foo")
assert(mounts.get(0).getSubPath === "foo")
assert(mounts.get(1).getName === "testPVC")
assert(mounts.get(1).getMountPath === "/tmp/bar")
assert(mounts.get(1).getSubPath === "bar")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,40 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
val volumeSpec = KubernetesVolumeSpec(
"volume",
"/tmp",
"",
false,
KubernetesHostPathVolumeConf("/path"))
val conf = KubernetesConf(
new SparkConf(false),
KubernetesDriverSpecificConf(
JavaMainAppResource(None),
"test-app",
"main",
Seq.empty),
"prefix",
"appId",
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Map.empty,
volumeSpec :: Nil,
hadoopConfSpec = None)
validateStepTypesApplied(
builderUnderTest.buildFromFeatures(conf),
BASIC_STEP_TYPE,
CREDENTIALS_STEP_TYPE,
SERVICE_STEP_TYPE,
LOCAL_DIRS_STEP_TYPE,
MOUNT_VOLUMES_STEP_TYPE,
DRIVER_CMD_STEP_TYPE)
}

test("Apply volumes step if a mount subpath is present.") {
val volumeSpec = KubernetesVolumeSpec(
"volume",
"/tmp",
"foo",
false,
KubernetesHostPathVolumeConf("/path"))
val conf = KubernetesConf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
val volumeSpec = KubernetesVolumeSpec(
"volume",
"/tmp",
"",
false,
KubernetesHostPathVolumeConf("/checkpoint"))
val conf = KubernetesConf(
Expand Down