Skip to content

[SPARK-25262][K8S] Better support configurability of Spark scratch space when using Kubernetes #22256

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,22 @@ spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.options.clai

The configuration properties for mounting volumes into the executor pods use prefix `spark.kubernetes.executor.` instead of `spark.kubernetes.driver.`. For a complete list of available options for each supported type of volumes, please refer to the [Spark Properties](#spark-properties) section below.

## Local Storage

Spark uses temporary scratch space to spill data to disk during shuffles and other operations. When using Kubernetes as the resource manager the pods will be created with an [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) volume mounted for each directory listed in `SPARK_LOCAL_DIRS`. If no directories are explicitly specified then a default directory is created and configured appropriately.

`emptyDir` volumes use the ephemeral storage feature of Kubernetes and do not persist beyond the life of the pod.

### Using RAM for local storage

As `emptyDir` volumes use the nodes backing storage for ephemeral storage this default behaviour may not be appropriate for some compute environments. For example if you have diskless nodes with remote storage mounted over a network having lots of executors doing IO to this remote storage may actually degrade performance.

In this case it may be desirable to set `spark.kubernetes.local.dirs.tmpfs=true` in your configuration which will cause the `emptyDir` volumes to be configured as `tmpfs` i.e. RAM backed volumes. When configured like this Sparks local storage usage will count towards your pods memory usage therefore you may wish to increase your memory requests via the normal `spark.driver.memory` and `spark.executor.memory` configuration properties.

### Using arbitrary volumes for local storage

Alternatively if using the pod template feature you can provide a volume named `spark-local-dirs-N`, where N is a 1 based index to the entires in your `SPARK_LOCAL_DIRS` variable, in your specification and that will be used for local storage. This enables you to use a volume type that is appropriate to your compute environment.

## Introspection and Debugging

These are the different ways in which you can investigate a running/completed Spark application, monitor progress, and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,15 @@ private[spark] object Config extends Logging {
"Ensure that major Python version is either Python2 or Python3")
.createWithDefault("2")

val KUBERNETES_LOCAL_DIRS_TMPFS =
ConfigBuilder("spark.kubernetes.local.dirs.tmpfs")
.doc("If set to true then emptyDir volumes created to back spark.local.dirs will have " +
"their medium set to Memory so that they will be created as tmpfs (i.e. RAM) backed " +
"volumes. This may improve performance but scratch space usage will count towards " +
"your pods memory limit so you may wish to request more memory.")
.booleanConf
.createWithDefault(false)

val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ package org.apache.spark.deploy.k8s.features
import java.nio.file.Paths
import java.util.UUID

import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, VolumeBuilder, VolumeMountBuilder}
import collection.JavaConverters._
import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, VolumeBuilder, VolumeMount, VolumeMountBuilder}

import org.apache.spark.SparkException
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._

private[spark] class LocalDirsFeatureStep(
conf: KubernetesConf[_ <: KubernetesRoleSpecificConf],
Expand All @@ -37,41 +40,99 @@ private[spark] class LocalDirsFeatureStep(
.orElse(conf.getOption("spark.local.dir"))
.getOrElse(defaultLocalDir)
.split(",")
private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS)

override def configurePod(pod: SparkPod): SparkPod = {
val localDirVolumes = resolvedLocalDirs
.zipWithIndex
.map { case (localDir, index) =>
new VolumeBuilder()
.withName(s"spark-local-dir-${index + 1}")
.withNewEmptyDir()
.endEmptyDir()
.build()
val name = s"spark-local-dir-${index + 1}"
// To allow customisation of local dirs backing volumes we should avoid creating
// emptyDir volumes if the volume is already defined in the pod spec
hasVolume(pod, name) match {
case true =>
// For pre-existing volume definitions just re-use the volume
pod.pod.getSpec().getVolumes().asScala.find(v => v.getName.equals(name)).get
case false =>
// Create new emptyDir volume
new VolumeBuilder()
.withName(name)
.withNewEmptyDir()
.withMedium(useLocalDirTmpFs match {
case true => "Memory" // Use tmpfs
case false => null // Default - use nodes backing storage
})
.endEmptyDir()
.build()
}
}

val localDirVolumeMounts = localDirVolumes
.zip(resolvedLocalDirs)
.map { case (localDirVolume, localDirPath) =>
new VolumeMountBuilder()
.withName(localDirVolume.getName)
.withMountPath(localDirPath)
.build()
hasVolumeMount(pod, localDirVolume.getName, localDirPath) match {
case true =>
// For pre-existing volume mounts just re-use the mount
pod.container.getVolumeMounts().asScala
.find(m => m.getName.equals(localDirVolume.getName)
&& m.getMountPath.equals(localDirPath))
.get
case false =>
// Create new volume mount
new VolumeMountBuilder()
.withName (localDirVolume.getName)
.withMountPath (localDirPath)
.build()
}
}

// Check for conflicting volume mounts
for (m: VolumeMount <- localDirVolumeMounts) {
if (hasConflictingVolumeMount(pod, m.getName, m.getMountPath).size > 0) {
throw new SparkException(s"Conflicting volume mounts defined, pod template attempted to " +
"mount SPARK_LOCAL_DIRS volume ${m.getName} multiple times or at an alternative path " +
"then the expected ${m.getPath}")
}
}

val podWithLocalDirVolumes = new PodBuilder(pod.pod)
.editSpec()
.addToVolumes(localDirVolumes: _*)
// Don't want to re-add volumes that already existed in the incoming spec
// as duplicate definitions will lead to K8S API errors
.addToVolumes(localDirVolumes.filter(v => !hasVolume(pod, v.getName)): _*)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking current volumes in a feature step isn't consistent with the additive design of the feature builder pattern. @mccheah to comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of this conflicting volume mount and conflicting volumes seems out of place here. If we're anticipating using the pod template file, keep in mind that the pod template feature is specifically not designed to do any validation. What kinds of errors are we hoping to avoid by doing the deduplication here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation is still additive in that it will add to existing elements in the pod spec as needed but respect what is already present.

If your pod spec contains duplicate volumes/volume mounts then K8S will reject it as invalid e.g.

The Pod "rvesse-test" is invalid: spec.volumes[1].name: Duplicate value: "spark-local-dirs-1"

Therefore it is necessary to explicitly avoid duplicating things already present in the template

If the aim is to replace adding further config options with the pod template feature then the existing builders do need to be more intelligent in what they do to avoid generating invalid pod specs. This is regardless of whether the template feature is opinionated about validation, even if the template feature doesn't do validation, Spark code itself should be ensuring that it generates valid specs as far as it is able to. Obviously it can't detect every possible invalid spec that it might generate if the templates aren't being validated but it can avoid introducing easily avoidable invalid specs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is regardless of whether the template feature is opinionated about validation, even if the template feature doesn't do validation, Spark code itself should be ensuring that it generates valid specs as far as it is able to.

This is a stance that as far I'm aware, we specifically chose not to take in the pod template feature. If one is using the pod template feature then Spark won't provide any guarantees that the pod it makes will be well-formed. When spark submit deploys the pod to the cluster the API will return a clear enough error informing the user to make the appropriate corrections to their pod template.

@onursatici I just checked the pod template files PR, I didn't see this specifically called out - should this be documented?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mccheah yeap we should document that, will add

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a stance that as far I'm aware, we specifically chose not to take in the pod template feature. If one is using the pod template feature then Spark won't provide any guarantees that the pod it makes will be well-formed. When spark submit deploys the pod to the cluster the API will return a clear enough error informing the user to make the appropriate corrections to their pod template.

Sure, but we still need to be realistic about how the template feature will be used. It is supposed to enable power users to customise the pods for their environments. If there is an area like this where there is a clear use case to allow customisation we should be enabling that rather than saying sorry we're going to generate invalid pods regardless. Obviously the power user is assuming the risk of creating a pod template that meaningfully combines with Sparks generated pod to yield a valid runtime environment.

Clearly my stance here is controversial and likely needs a broader discussion on the dev list.

I can reduce this PR to just the config to enable tmpfs backed emptyDir volumes if that is preferred?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that might be better @rvesse

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, will do that Monday

FYI I notice @onursatici has now made some similar tweaks in his latest commit - a4fde0c - notice several feature steps there now have editOrNewX() or addToX() so that they combine with rather than overriding the template

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is different in that we're looking for specific volumes that have been set up by previous feature steps or outside logic. Preferably every step is self-contained in that it doesn't have to look up specific values set by previous steps.

For example this logic would break if we applied the templating after this step, or if a different step after this one added the volumes that are being looked up here.

Whereas editOrNew and addTo... at worst only change the ordering on some of the fields depending on when the step is invoked in the sequence.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mccheah @ifilonenko OK, I have opened PR #22323 with just the tmpfs enabling changes

.endSpec()
.build()
val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container)
.addNewEnv()
.withName("SPARK_LOCAL_DIRS")
.withValue(resolvedLocalDirs.mkString(","))
.endEnv()
.addToVolumeMounts(localDirVolumeMounts: _*)
// Don't want to re-add volume mounts that already existed in the incoming spec
// as duplicate definitions will lead to K8S API errors
.addToVolumeMounts(localDirVolumeMounts
.filter(m => !hasVolumeMount(pod, m.getName, m.getMountPath)): _*)
.build()
SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts)
}

override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty

def hasVolume(pod: SparkPod, name: String): Boolean = {
pod.pod.getSpec().getVolumes().asScala.exists(v => v.getName.equals(name))
}

def hasVolumeMount(pod: SparkPod, name: String, path: String): Boolean = {
pod.container.getVolumeMounts().asScala
.exists(m => m.getName.equals(name) && m.getMountPath.equals(path))
}

def hasConflictingVolumeMount(pod: SparkPod, name: String, path: String): Seq[VolumeMount] = {
// A volume mount is considered conflicting if it matches one, and only one of, the name/path
// of a volume mount we are creating
pod.container.getVolumeMounts().asScala
.filter(m => (m.getName.equals(name) && !m.getMountPath.equals(path)) ||
(m.getMountPath.equals(path) && !m.getName.equals(name)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
*/
package org.apache.spark.deploy.k8s.features

import io.fabric8.kubernetes.api.model.{EnvVarBuilder, VolumeBuilder, VolumeMountBuilder}
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, PodBuilder, VolumeBuilder, VolumeMountBuilder}
import org.mockito.Mockito
import org.scalatest._
import Matchers._
import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._

class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
private val defaultLocalDir = "/var/data/default-local-dir"
Expand Down Expand Up @@ -111,4 +114,146 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
.withValue("/var/data/my-local-dir-1,/var/data/my-local-dir-2")
.build())
}

test("Use predefined volume for local dirs") {
Mockito.doReturn(null).when(sparkConf).get("spark.local.dir")
Mockito.doReturn(null).when(sparkConf).getenv("SPARK_LOCAL_DIRS")
val stepUnderTest = new LocalDirsFeatureStep(kubernetesConf, defaultLocalDir)
val initialPod =
new PodBuilder(SparkPod.initialPod().pod)
.editSpec()
.addToVolumes(new VolumeBuilder()
.withName("spark-local-dir-1")
.withNewConfigMap()
.withName("foo")
.endConfigMap()
.build())
.endSpec()
.build()

val configuredPod = stepUnderTest.configurePod(new SparkPod(initialPod,
new ContainerBuilder().build()))
assert(configuredPod.pod.getSpec.getVolumes.size === 1)
assert(configuredPod.pod.getSpec.getVolumes.get(0) ===
new VolumeBuilder()
.withName(s"spark-local-dir-1")
.withNewConfigMap()
.withName("foo")
.endConfigMap()
.build())
assert(configuredPod.container.getVolumeMounts.size === 1)
assert(configuredPod.container.getVolumeMounts.get(0) ===
new VolumeMountBuilder()
.withName(s"spark-local-dir-1")
.withMountPath(defaultLocalDir)
.build())
assert(configuredPod.container.getEnv.size === 1)
assert(configuredPod.container.getEnv.get(0) ===
new EnvVarBuilder()
.withName("SPARK_LOCAL_DIRS")
.withValue(defaultLocalDir)
.build())
}

test("Use predefined volume and mount for local dirs") {
Mockito.doReturn(null).when(sparkConf).get("spark.local.dir")
Mockito.doReturn(null).when(sparkConf).getenv("SPARK_LOCAL_DIRS")
val stepUnderTest = new LocalDirsFeatureStep(kubernetesConf, defaultLocalDir)
val initialPod =
new PodBuilder(SparkPod.initialPod().pod)
.editSpec()
.addToVolumes(new VolumeBuilder()
.withName("spark-local-dir-1")
.withNewConfigMap()
.withName("foo")
.endConfigMap()
.build())
.endSpec()
.build()
val initialContainer =
new ContainerBuilder()
.withVolumeMounts(new VolumeMountBuilder()
.withName("spark-local-dir-1")
.withMountPath(defaultLocalDir)
.build())
.build()

val configuredPod = stepUnderTest.configurePod(new SparkPod(initialPod,
initialContainer))
assert(configuredPod.pod.getSpec.getVolumes.size === 1)
assert(configuredPod.pod.getSpec.getVolumes.get(0) ===
new VolumeBuilder()
.withName(s"spark-local-dir-1")
.withNewConfigMap()
.withName("foo")
.endConfigMap()
.build())
assert(configuredPod.container.getVolumeMounts.size === 1)
assert(configuredPod.container.getVolumeMounts.get(0) ===
new VolumeMountBuilder()
.withName(s"spark-local-dir-1")
.withMountPath(defaultLocalDir)
.build())
assert(configuredPod.container.getEnv.size === 1)
assert(configuredPod.container.getEnv.get(0) ===
new EnvVarBuilder()
.withName("SPARK_LOCAL_DIRS")
.withValue(defaultLocalDir)
.build())
}

test("Using conflicting volume mount for local dirs should error") {
Mockito.doReturn(null).when(sparkConf).get("spark.local.dir")
Mockito.doReturn(null).when(sparkConf).getenv("SPARK_LOCAL_DIRS")
val stepUnderTest = new LocalDirsFeatureStep(kubernetesConf, defaultLocalDir)
val initialPod =
new PodBuilder(SparkPod.initialPod().pod)
.editSpec()
.addToVolumes(new VolumeBuilder()
.withName("spark-local-dir-1")
.withNewConfigMap()
.withName("foo")
.endConfigMap()
.build())
.endSpec()
.build()
val initialContainer =
new ContainerBuilder()
.withVolumeMounts(new VolumeMountBuilder()
.withName("spark-local-dir-1")
.withMountPath("/bad/path")
.build())
.build()

an [SparkException] should be thrownBy stepUnderTest.configurePod(new SparkPod(initialPod,
initialContainer))
}

test("Use tmpfs to back default local dir") {
Mockito.doReturn(null).when(sparkConf).get("spark.local.dir")
Mockito.doReturn(null).when(sparkConf).getenv("SPARK_LOCAL_DIRS")
Mockito.doReturn(true).when(sparkConf).get(KUBERNETES_LOCAL_DIRS_TMPFS)
val stepUnderTest = new LocalDirsFeatureStep(kubernetesConf, defaultLocalDir)
val configuredPod = stepUnderTest.configurePod(SparkPod.initialPod())
assert(configuredPod.pod.getSpec.getVolumes.size === 1)
assert(configuredPod.pod.getSpec.getVolumes.get(0) ===
new VolumeBuilder()
.withName(s"spark-local-dir-1")
.withNewEmptyDir()
.withMedium("Memory")
.endEmptyDir()
.build())
assert(configuredPod.container.getVolumeMounts.size === 1)
assert(configuredPod.container.getVolumeMounts.get(0) ===
new VolumeMountBuilder()
.withName(s"spark-local-dir-1")
.withMountPath(defaultLocalDir)
.build())
assert(configuredPod.container.getEnv.size === 1)
assert(configuredPod.container.getEnv.get(0) ===
new EnvVarBuilder()
.withName("SPARK_LOCAL_DIRS")
.withValue(defaultLocalDir)
.build())
}
}