Skip to content

Commit 40d08cd

Browse files
author
Andrew Korzhuev
committed
[SPARK-23668][K8S] Add config option for passing through k8s Pod.spec.imagePullSecrets
Pass through the `imagePullSecrets` option to the k8s pod in order to allow user to access private image registries. See https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/ Unit tests + manual testing. Manual testing procedure: 1. Have private image registry. 2. Spark-submit application with no `spark.kubernetes.imagePullSecret` set. Do `kubectl describe pod ...`. See the error message: ``` Error syncing pod, skipping: failed to "StartContainer" for "spark-kubernetes-driver" with ErrImagePull: "rpc error: code = 2 desc = Error: Status 400 trying to pull repository ...: \"{\\n \\\"errors\\\" : [ {\\n \\\"status\\\" : 400,\\n \\\"message\\\" : \\\"Unsupported docker v1 repository request for '...'\\\"\\n } ]\\n}\"" ``` 3. Create secret `kubectl create secret docker-registry ...` 4. Spark-submit with `spark.kubernetes.imagePullSecret` set to the new secret. See that deployment was successful. Author: Andrew Korzhuev <andrew.korzhuev@klarna.com> Author: Andrew Korzhuev <korzhuev@andrusha.me> Closes apache#20811 from andrusha/spark-23668-image-pull-secrets.
1 parent 621234a commit 40d08cd

File tree

7 files changed

+77
-3
lines changed

7 files changed

+77
-3
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ private[spark] object Config extends Logging {
5454
.checkValues(Set("Always", "Never", "IfNotPresent"))
5555
.createWithDefault("IfNotPresent")
5656

57+
val IMAGE_PULL_SECRETS =
58+
ConfigBuilder("spark.kubernetes.container.image.pullSecrets")
59+
.doc("Comma separated list of the Kubernetes secrets used " +
60+
"to access private image registries.")
61+
.stringConf
62+
.createOptional
63+
5764
val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
5865
"spark.kubernetes.authenticate.driver"
5966
val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s
1818

1919
import java.io.File
2020

21-
import io.fabric8.kubernetes.api.model.{Container, Pod, PodBuilder}
21+
import io.fabric8.kubernetes.api.model._
2222

2323
import org.apache.spark.SparkConf
2424
import org.apache.spark.util.Utils
@@ -39,6 +39,17 @@ private[spark] object KubernetesUtils {
3939
sparkConf.getAllWithPrefix(prefix).toMap
4040
}
4141

42+
/**
43+
* Parses comma-separated list of imagePullSecrets into K8s-understandable format
44+
*/
45+
def parseImagePullSecrets(imagePullSecrets: Option[String]): List[LocalObjectReference] = {
46+
imagePullSecrets match {
47+
case Some(secretsCommaSeparated) =>
48+
secretsCommaSeparated.split(',').map(_.trim).map(new LocalObjectReference(_)).toList
49+
case None => Nil
50+
}
51+
}
52+
4253
def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
4354
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
4455
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit.steps
1818

1919
import scala.collection.JavaConverters._
2020

21-
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
21+
import io.fabric8.kubernetes.api.model._
2222

2323
import org.apache.spark.{SparkConf, SparkException}
2424
import org.apache.spark.deploy.k8s.Config._
@@ -50,6 +50,8 @@ private[spark] class BasicDriverConfigurationStep(
5050
.get(DRIVER_CONTAINER_IMAGE)
5151
.getOrElse(throw new SparkException("Must specify the driver container image"))
5252

53+
private val imagePullSecrets = sparkConf.get(IMAGE_PULL_SECRETS)
54+
5355
// CPU settings
5456
private val driverCpuCores = sparkConf.getOption("spark.driver.cores").getOrElse("1")
5557
private val driverLimitCores = sparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
@@ -136,6 +138,8 @@ private[spark] class BasicDriverConfigurationStep(
136138
.addToArgs("driver")
137139
.build()
138140

141+
val parsedImagePullSecrets = KubernetesUtils.parseImagePullSecrets(imagePullSecrets)
142+
139143
val baseDriverPod = new PodBuilder(driverSpec.driverPod)
140144
.editOrNewMetadata()
141145
.withName(driverPodName)
@@ -145,6 +149,7 @@ private[spark] class BasicDriverConfigurationStep(
145149
.withNewSpec()
146150
.withRestartPolicy("Never")
147151
.withNodeSelector(nodeSelector.asJava)
152+
.withImagePullSecrets(parsedImagePullSecrets.asJava)
148153
.endSpec()
149154
.build()
150155

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ private[spark] class ExecutorPodFactory(
7676
.get(EXECUTOR_CONTAINER_IMAGE)
7777
.getOrElse(throw new SparkException("Must specify the executor container image"))
7878
private val imagePullPolicy = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
79+
private val imagePullSecrets = sparkConf.get(IMAGE_PULL_SECRETS)
7980
private val blockManagerPort = sparkConf
8081
.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
8182

@@ -108,6 +109,8 @@ private[spark] class ExecutorPodFactory(
108109
nodeToLocalTaskCount: Map[String, Int]): Pod = {
109110
val name = s"$executorPodNamePrefix-exec-$executorId"
110111

112+
val parsedImagePullSecrets = KubernetesUtils.parseImagePullSecrets(imagePullSecrets)
113+
111114
// hostname must be no longer than 63 characters, so take the last 63 characters of the pod
112115
// name as the hostname. This preserves uniqueness since the end of name contains
113116
// executorId
@@ -202,6 +205,7 @@ private[spark] class ExecutorPodFactory(
202205
.withHostname(hostname)
203206
.withRestartPolicy("Never")
204207
.withNodeSelector(nodeSelector.asJava)
208+
.withImagePullSecrets(parsedImagePullSecrets.asJava)
205209
.endSpec()
206210
.build()
207211

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s
18+
19+
import io.fabric8.kubernetes.api.model.LocalObjectReference
20+
21+
import org.apache.spark.SparkFunSuite
22+
23+
class KubernetesUtilsTest extends SparkFunSuite {
24+
25+
test("testParseImagePullSecrets") {
26+
val noSecrets = KubernetesUtils.parseImagePullSecrets(None)
27+
assert(noSecrets === Nil)
28+
29+
val oneSecret = KubernetesUtils.parseImagePullSecrets(Some("imagePullSecret"))
30+
assert(oneSecret === new LocalObjectReference("imagePullSecret") :: Nil)
31+
32+
val commaSeparatedSecrets = KubernetesUtils.parseImagePullSecrets(Some("s1, s2 , s3,s4"))
33+
assert(commaSeparatedSecrets.map(_.getName) === "s1" :: "s2" :: "s3" :: "s4" :: Nil)
34+
}
35+
36+
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class BasicDriverConfigurationStepSuite extends SparkFunSuite {
5151
.set(s"$KUBERNETES_DRIVER_ANNOTATION_PREFIX$CUSTOM_ANNOTATION_KEY", CUSTOM_ANNOTATION_VALUE)
5252
.set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY1", "customDriverEnv1")
5353
.set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY2", "customDriverEnv2")
54+
.set(IMAGE_PULL_SECRETS, "imagePullSecret1, imagePullSecret2")
5455

5556
val submissionStep = new BasicDriverConfigurationStep(
5657
APP_ID,
@@ -106,7 +107,12 @@ class BasicDriverConfigurationStepSuite extends SparkFunSuite {
106107
CUSTOM_ANNOTATION_KEY -> CUSTOM_ANNOTATION_VALUE,
107108
SPARK_APP_NAME_ANNOTATION -> APP_NAME)
108109
assert(driverPodMetadata.getAnnotations.asScala === expectedAnnotations)
109-
assert(preparedDriverSpec.driverPod.getSpec.getRestartPolicy === "Never")
110+
111+
val driverPodSpec = preparedDriverSpec.driverPod.getSpec
112+
assert(driverPodSpec.getRestartPolicy === "Never")
113+
assert(driverPodSpec.getImagePullSecrets.size() === 2)
114+
assert(driverPodSpec.getImagePullSecrets.get(0).getName === "imagePullSecret1")
115+
assert(driverPodSpec.getImagePullSecrets.get(1).getName === "imagePullSecret2")
110116

111117
val resolvedSparkConf = preparedDriverSpec.driverSparkConf.getAll.toMap
112118
val expectedSparkConf = Map(

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
3535
private val driverPodUid: String = "driver-uid"
3636
private val executorPrefix: String = "base"
3737
private val executorImage: String = "executor-image"
38+
private val imagePullSecrets: String = "imagePullSecret1, imagePullSecret2"
3839
private val driverPod = new PodBuilder()
3940
.withNewMetadata()
4041
.withName(driverPodName)
@@ -55,6 +56,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
5556
.set(KUBERNETES_DRIVER_POD_NAME, driverPodName)
5657
.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, executorPrefix)
5758
.set(CONTAINER_IMAGE, executorImage)
59+
.set(IMAGE_PULL_SECRETS, imagePullSecrets)
5860
}
5961

6062
test("basic executor pod has reasonable defaults") {
@@ -75,6 +77,9 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
7577
assert(executor.getSpec.getContainers.get(0).getResources.getLimits.size() === 1)
7678
assert(executor.getSpec.getContainers.get(0).getResources
7779
.getLimits.get("memory").getAmount === "1408Mi")
80+
assert(executor.getSpec.getImagePullSecrets.size() === 2)
81+
assert(executor.getSpec.getImagePullSecrets.get(0).getName === "imagePullSecret1")
82+
assert(executor.getSpec.getImagePullSecrets.get(1).getName === "imagePullSecret2")
7883

7984
// The pod has no node selector, volumes.
8085
assert(executor.getSpec.getNodeSelector.isEmpty)

0 commit comments

Comments
 (0)