Skip to content

[SPARK-22839][K8S] Remove the use of init-container for downloading remote dependencies #20669

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions bin/docker-image-tool.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ function build {
error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
fi

local DOCKERFILE=${DOCKERFILE:-"$IMG_PATH/spark/Dockerfile"}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason why this is tied to this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uhm, this technically could be a separate PR. I was initially piggy-backing off the work of Marcelo in being able to build Dockerfiles via -f.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we separate this out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a problem.


docker build "${BUILD_ARGS[@]}" \
-t $(image_ref spark) \
-f "$IMG_PATH/spark/Dockerfile" .
-f "$DOCKERFILE" .
}

function push {
Expand All @@ -83,6 +85,7 @@ Commands:
push Push a pre-built image to a registry. Requires a repository address to be provided.

Options:
-f file Dockerfile to build. By default builds the Dockerfile shipped with Spark.
-r repo Repository address.
-t tag Tag to apply to the built image, or to identify the image to be pushed.
-m Use minikube's Docker daemon.
Expand Down Expand Up @@ -112,10 +115,12 @@ fi

REPO=
TAG=
while getopts mr:t: option
DOCKERFILE=
while getopts f:mr:t: option
do
case "${option}"
in
f) DOCKERFILE=${OPTARG};;
r) REPO=${OPTARG};;
t) TAG=${OPTARG};;
m)
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,6 @@ object SparkSubmit extends CommandLineUtils with Logging {
printErrorAndExit("Python applications are currently not supported for Kubernetes.")
case (KUBERNETES, _) if args.isR =>
printErrorAndExit("R applications are currently not supported for Kubernetes.")
case (KUBERNETES, CLIENT) =>
printErrorAndExit("Client mode is currently not supported for Kubernetes.")
case (LOCAL, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"")
case (_, CLUSTER) if isShell(args.primaryResource) =>
Expand Down
71 changes: 2 additions & 69 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,29 +126,6 @@ Those dependencies can be added to the classpath by referencing them with `local
dependencies in custom-built Docker images in `spark-submit`. Note that using application dependencies from the submission
client's local file system is currently not yet supported.


### Using Remote Dependencies
When there are application dependencies hosted in remote locations like HDFS or HTTP servers, the driver and executor pods
need a Kubernetes [init-container](https://kubernetes.io/docs/concepts/workloads/pods/init-containers/) for downloading
the dependencies so the driver and executor containers can use them locally.

The init-container handles remote dependencies specified in `spark.jars` (or the `--jars` option of `spark-submit`) and
`spark.files` (or the `--files` option of `spark-submit`). It also handles remotely hosted main application resources, e.g.,
the main application jar. The following shows an example of using remote dependencies with the `spark-submit` command:

```bash
$ bin/spark-submit \
--master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--jars https://path/to/dependency1.jar,https://path/to/dependency2.jar
--files hdfs://host:port/path/to/file1,hdfs://host:port/path/to/file2
--conf spark.executor.instances=5 \
--conf spark.kubernetes.container.image=<spark-image> \
https://path/to/examples.jar
```

## Secret Management
Kubernetes [Secrets](https://kubernetes.io/docs/concepts/configuration/secret/) can be used to provide credentials for a
Spark application to access secured services. To mount a user-specified secret into the driver container, users can use
Expand All @@ -163,10 +140,6 @@ namespace as that of the driver and executor pods. For example, to mount a secre
--conf spark.kubernetes.executor.secrets.spark-secret=/etc/secrets
```

Note that if an init-container is used, any secret mounted into the driver container will also be mounted into the
init-container of the driver. Similarly, any secret mounted into an executor container will also be mounted into the
init-container of the executor.

## Introspection and Debugging

These are the different ways in which you can investigate a running/completed Spark application, monitor progress, and
Expand Down Expand Up @@ -604,60 +577,20 @@ specific to Spark on Kubernetes.
the Driver process. The user can specify multiple of these to set multiple environment variables.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.mountDependencies.jarsDownloadDir</code></td>
<td><code>/var/spark-data/spark-jars</code></td>
<td>
Location to download jars to in the driver and executors.
This directory must be empty and will be mounted as an empty directory volume on the driver and executor pods.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.mountDependencies.filesDownloadDir</code></td>
<td><code>/var/spark-data/spark-files</code></td>
<td>
Location to download jars to in the driver and executors.
This directory must be empty and will be mounted as an empty directory volume on the driver and executor pods.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.mountDependencies.timeout</code></td>
<td>300s</td>
<td>
Timeout in seconds before aborting the attempt to download and unpack dependencies from remote locations into
the driver and executor pods.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.mountDependencies.maxSimultaneousDownloads</code></td>
<td>5</td>
<td>
Maximum number of remote dependencies to download simultaneously in a driver or executor pod.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.initContainer.image</code></td>
<td><code>(value of spark.kubernetes.container.image)</code></td>
<td>
Custom container image for the init container of both driver and executors.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.secrets.[SecretName]</code></td>
<td>(none)</td>
<td>
Add the <a href="https://kubernetes.io/docs/concepts/configuration/secret/">Kubernetes Secret</a> named <code>SecretName</code> to the driver pod on the path specified in the value. For example,
<code>spark.kubernetes.driver.secrets.spark-secret=/etc/secrets</code>. Note that if an init-container is used,
the secret will also be added to the init-container in the driver pod.
<code>spark.kubernetes.driver.secrets.spark-secret=/etc/secrets</code>.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.secrets.[SecretName]</code></td>
<td>(none)</td>
<td>
Add the <a href="https://kubernetes.io/docs/concepts/configuration/secret/">Kubernetes Secret</a> named <code>SecretName</code> to the executor pod on the path specified in the value. For example,
<code>spark.kubernetes.executor.secrets.spark-secret=/etc/secrets</code>. Note that if an init-container is used,
the secret will also be added to the init-container in the executor pod.
<code>spark.kubernetes.executor.secrets.spark-secret=/etc/secrets</code>.
</td>
</tr>
</table>
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// scalastyle:off println
package org.apache.spark.examples

import java.io.File

import org.apache.spark.SparkFiles
import org.apache.spark.sql.SparkSession

/** Usage: SparkRemoteFileTest [file] */
object SparkRemoteFileTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, what is the purpose of this specific example?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To test the presence of a remote file being mounted on the executors via the spark-submit being run by the driver. Should I add a Javadoc (HDFSTest.scala didn't include one) but I can if necessary

def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage: SparkRemoteFileTest <file>")
System.exit(1)
}
val spark = SparkSession
.builder()
.appName("SparkRemoteFileTest")
.getOrCreate()
val sc = spark.sparkContext
val rdd = sc.parallelize(Seq(1)).map(_ => {
val localLocation = SparkFiles.get(args(0))
println(s"${args(0)} is stored at: $localLocation")
new File(localLocation).isFile
})
val truthCheck = rdd.collect().head
println(s"Mounting of ${args(0)} was $truthCheck")
spark.stop()
}
}
// scalastyle:on println
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional

val KUBERNETES_DRIVER_SUBMIT_CHECK =
ConfigBuilder("spark.kubernetes.submitInDriver")
.internal()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: The indentation looks slightly off here.

.booleanConf
.createOptional

val KUBERNETES_EXECUTOR_LIMIT_CORES =
ConfigBuilder("spark.kubernetes.executor.limit.cores")
.doc("Specify the hard cpu limit for each executor pod")
Expand Down Expand Up @@ -135,73 +141,6 @@ private[spark] object Config extends Logging {
.checkValue(interval => interval > 0, s"Logging interval must be a positive time value.")
.createWithDefaultString("1s")

val JARS_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir")
.doc("Location to download jars to in the driver and executors. When using " +
"spark-submit, this directory must be empty and will be mounted as an empty directory " +
"volume on the driver and executor pod.")
.stringConf
.createWithDefault("/var/spark-data/spark-jars")

val FILES_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.mountDependencies.filesDownloadDir")
.doc("Location to download files to in the driver and executors. When using " +
"spark-submit, this directory must be empty and will be mounted as an empty directory " +
"volume on the driver and executor pods.")
.stringConf
.createWithDefault("/var/spark-data/spark-files")

val INIT_CONTAINER_IMAGE =
ConfigBuilder("spark.kubernetes.initContainer.image")
.doc("Image for the driver and executor's init-container for downloading dependencies.")
.fallbackConf(CONTAINER_IMAGE)

val INIT_CONTAINER_MOUNT_TIMEOUT =
ConfigBuilder("spark.kubernetes.mountDependencies.timeout")
.doc("Timeout before aborting the attempt to download and unpack dependencies from remote " +
"locations into the driver and executor pods.")
.timeConf(TimeUnit.SECONDS)
.createWithDefault(300)

val INIT_CONTAINER_MAX_THREAD_POOL_SIZE =
ConfigBuilder("spark.kubernetes.mountDependencies.maxSimultaneousDownloads")
.doc("Maximum number of remote dependencies to download simultaneously in a driver or " +
"executor pod.")
.intConf
.createWithDefault(5)

val INIT_CONTAINER_REMOTE_JARS =
ConfigBuilder("spark.kubernetes.initContainer.remoteJars")
.doc("Comma-separated list of jar URIs to download in the init-container. This is " +
"calculated from spark.jars.")
.internal()
.stringConf
.createOptional

val INIT_CONTAINER_REMOTE_FILES =
ConfigBuilder("spark.kubernetes.initContainer.remoteFiles")
.doc("Comma-separated list of file URIs to download in the init-container. This is " +
"calculated from spark.files.")
.internal()
.stringConf
.createOptional

val INIT_CONTAINER_CONFIG_MAP_NAME =
ConfigBuilder("spark.kubernetes.initContainer.configMapName")
.doc("Name of the config map to use in the init-container that retrieves submitted files " +
"for the executor.")
.internal()
.stringConf
.createOptional

val INIT_CONTAINER_CONFIG_MAP_KEY_CONF =
ConfigBuilder("spark.kubernetes.initContainer.configMapKey")
.doc("Key for the entry in the init container config map for submitted files that " +
"corresponds to the properties for this init-container.")
.internal()
.stringConf
.createOptional

val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,22 +63,13 @@ private[spark] object Constants {
val ENV_MOUNTED_CLASSPATH = "SPARK_MOUNTED_CLASSPATH"
val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_"
val ENV_CLASSPATH = "SPARK_CLASSPATH"
val ENV_DRIVER_MAIN_CLASS = "SPARK_DRIVER_CLASS"
val ENV_DRIVER_ARGS = "SPARK_DRIVER_ARGS"
val ENV_DRIVER_JAVA_OPTS = "SPARK_DRIVER_JAVA_OPTS"
val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS"
val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY"
val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR"

// Bootstrapping dependencies with the init-container
val INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME = "download-jars-volume"
val INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME = "download-files-volume"
val INIT_CONTAINER_PROPERTIES_FILE_VOLUME = "spark-init-properties"
val INIT_CONTAINER_PROPERTIES_FILE_DIR = "/etc/spark-init"
val INIT_CONTAINER_PROPERTIES_FILE_NAME = "spark-init.properties"
val INIT_CONTAINER_PROPERTIES_FILE_PATH =
s"$INIT_CONTAINER_PROPERTIES_FILE_DIR/$INIT_CONTAINER_PROPERTIES_FILE_NAME"
val INIT_CONTAINER_SECRET_VOLUME_NAME = "spark-init-secret"
val ENV_SPARK_CONF_DIR = "SPARK_CONF_DIR"
// Spark app configs for containers
val SPARK_CONF_VOLUME = "spark-conf-volume"
val SPARK_CONF_DIR_INTERNAL = "/opt/spark/conf"
val SPARK_CONF_FILE_NAME = "spark.properties"
val SPARK_CONF_PATH = s"$SPARK_CONF_DIR_INTERNAL/$SPARK_CONF_FILE_NAME"

// Miscellaneous
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
Expand Down
Loading