Skip to content

Commit 08fe944

Browse files
chenchunash211
authored andcommitted
Bypass init-containers when possible (#348)
1 parent a6291c6 commit 08fe944

File tree

5 files changed

+80
-44
lines changed

5 files changed

+80
-44
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -156,31 +156,33 @@ private[spark] class Client(
156156
.addToContainers(driverContainer)
157157
.endSpec()
158158

159-
val maybeSubmittedDependencyUploader = initContainerComponentsProvider
160-
.provideInitContainerSubmittedDependencyUploader(allDriverLabels)
161-
val maybeSubmittedResourceIdentifiers = maybeSubmittedDependencyUploader.map { uploader =>
159+
val maybeSubmittedResourceIdentifiers = initContainerComponentsProvider
160+
.provideInitContainerSubmittedDependencyUploader(allDriverLabels)
161+
.map { uploader =>
162162
SubmittedResources(uploader.uploadJars(), uploader.uploadFiles())
163163
}
164-
val maybeSecretBuilder = initContainerComponentsProvider
165-
.provideSubmittedDependenciesSecretBuilder(
166-
maybeSubmittedResourceIdentifiers.map(_.secrets()))
167-
val maybeSubmittedDependenciesSecret = maybeSecretBuilder.map(_.build())
168-
val initContainerConfigMap = initContainerComponentsProvider
169-
.provideInitContainerConfigMapBuilder(maybeSubmittedResourceIdentifiers.map(_.ids()))
170-
.build()
171-
val podWithInitContainer = initContainerComponentsProvider
172-
.provideInitContainerBootstrap()
173-
.bootstrapInitContainerAndVolumes(driverContainer.getName, basePod)
164+
val maybeSubmittedDependenciesSecret = initContainerComponentsProvider
165+
.provideSubmittedDependenciesSecretBuilder(
166+
maybeSubmittedResourceIdentifiers.map(_.secrets()))
167+
.map(_.build())
174168

175169
val containerLocalizedFilesResolver = initContainerComponentsProvider
176-
.provideContainerLocalizedFilesResolver()
170+
.provideContainerLocalizedFilesResolver()
177171
val resolvedSparkJars = containerLocalizedFilesResolver.resolveSubmittedSparkJars()
178172
val resolvedSparkFiles = containerLocalizedFilesResolver.resolveSubmittedSparkFiles()
179173

180-
val executorInitContainerConfiguration = initContainerComponentsProvider
181-
.provideExecutorInitContainerConfiguration()
182-
val sparkConfWithExecutorInit = executorInitContainerConfiguration
183-
.configureSparkConfForExecutorInitContainer(sparkConf)
174+
val initContainerBundler = initContainerComponentsProvider
175+
.provideInitContainerBundle(maybeSubmittedResourceIdentifiers.map(_.ids()),
176+
resolvedSparkJars ++ resolvedSparkFiles)
177+
178+
val podWithInitContainer = initContainerBundler.map(
179+
_.sparkPodInitContainerBootstrap
180+
.bootstrapInitContainerAndVolumes(driverContainer.getName, basePod))
181+
.getOrElse(basePod)
182+
val sparkConfWithExecutorInit = initContainerBundler.map(
183+
_.executorInitContainerConfiguration
184+
.configureSparkConfForExecutorInitContainer(sparkConf))
185+
.getOrElse(sparkConf)
184186
val credentialsMounter = kubernetesCredentialsMounterProvider
185187
.getDriverPodKubernetesCredentialsMounter()
186188
val credentialsSecret = credentialsMounter.createCredentialsSecret()
@@ -224,7 +226,8 @@ private[spark] class Client(
224226
.watch(loggingPodStatusWatcher)) { _ =>
225227
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
226228
try {
227-
val driverOwnedResources = Seq(initContainerConfigMap) ++
229+
val driverOwnedResources = initContainerBundler.map(
230+
_.sparkInitContainerConfigMap).toSeq ++
228231
maybeSubmittedDependenciesSecret.toSeq ++
229232
credentialsSecret.toSeq
230233
val driverPodOwnerReference = new OwnerReferenceBuilder()

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
package org.apache.spark.deploy.kubernetes.submit
1818

19+
import io.fabric8.kubernetes.api.model.ConfigMap
20+
1921
import org.apache.spark.{SparkConf, SSLOptions}
2022
import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrap, SparkPodInitContainerBootstrapImpl}
2123
import org.apache.spark.deploy.kubernetes.config._
@@ -30,17 +32,15 @@ import org.apache.spark.util.Utils
3032
*/
3133
private[spark] trait DriverInitContainerComponentsProvider {
3234

33-
def provideInitContainerConfigMapBuilder(
34-
maybeSubmittedResourceIds: Option[SubmittedResourceIds])
35-
: SparkInitContainerConfigMapBuilder
3635
def provideContainerLocalizedFilesResolver(): ContainerLocalizedFilesResolver
37-
def provideExecutorInitContainerConfiguration(): ExecutorInitContainerConfiguration
3836
def provideInitContainerSubmittedDependencyUploader(
3937
driverPodLabels: Map[String, String]): Option[SubmittedDependencyUploader]
4038
def provideSubmittedDependenciesSecretBuilder(
4139
maybeSubmittedResourceSecrets: Option[SubmittedResourceSecrets])
4240
: Option[SubmittedDependencySecretBuilder]
4341
def provideInitContainerBootstrap(): SparkPodInitContainerBootstrap
42+
def provideInitContainerBundle(maybeSubmittedResourceIds: Option[SubmittedResourceIds],
43+
uris: Iterable[String]): Option[InitContainerBundle]
4444
}
4545

4646
private[spark] class DriverInitContainerComponentsProviderImpl(
@@ -105,9 +105,8 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
105105
private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
106106
private val downloadTimeoutMinutes = sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT)
107107

108-
override def provideInitContainerConfigMapBuilder(
109-
maybeSubmittedResourceIds: Option[SubmittedResourceIds])
110-
: SparkInitContainerConfigMapBuilder = {
108+
private def provideInitContainerConfigMap(
109+
maybeSubmittedResourceIds: Option[SubmittedResourceIds]): ConfigMap = {
111110
val submittedDependencyConfigPlugin = for {
112111
stagingServerUri <- maybeResourceStagingServerUri
113112
jarsResourceId <- maybeSubmittedResourceIds.map(_.jarsResourceId)
@@ -136,15 +135,15 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
136135
filesDownloadPath,
137136
configMapName,
138137
configMapKey,
139-
submittedDependencyConfigPlugin)
138+
submittedDependencyConfigPlugin).build()
140139
}
141140

142141
override def provideContainerLocalizedFilesResolver(): ContainerLocalizedFilesResolver = {
143142
new ContainerLocalizedFilesResolverImpl(
144143
sparkJars, sparkFiles, jarsDownloadPath, filesDownloadPath)
145144
}
146145

147-
override def provideExecutorInitContainerConfiguration(): ExecutorInitContainerConfiguration = {
146+
private def provideExecutorInitContainerConfiguration(): ExecutorInitContainerConfiguration = {
148147
new ExecutorInitContainerConfigurationImpl(
149148
maybeSecretName,
150149
INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH,
@@ -202,4 +201,16 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
202201
configMapKey,
203202
resourceStagingServerSecretPlugin)
204203
}
204+
205+
override def provideInitContainerBundle(
206+
maybeSubmittedResourceIds: Option[SubmittedResourceIds],
207+
uris: Iterable[String]): Option[InitContainerBundle] = {
208+
val containerLocalizedFilesResolver = provideContainerLocalizedFilesResolver()
209+
// Bypass init-containers if `spark.jars` and `spark.files` is empty or only has `local://` URIs
210+
if (KubernetesFileUtils.getNonContainerLocalFiles(uris).nonEmpty) {
211+
Some(InitContainerBundle(provideInitContainerConfigMap(maybeSubmittedResourceIds),
212+
provideInitContainerBootstrap(),
213+
provideExecutorInitContainerConfiguration()))
214+
} else None
215+
}
205216
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.submit
18+
19+
import io.fabric8.kubernetes.api.model.ConfigMap
20+
21+
import org.apache.spark.deploy.kubernetes.{SparkPodInitContainerBootstrap}
22+
23+
case class InitContainerBundle(
24+
sparkInitContainerConfigMap: ConfigMap,
25+
sparkPodInitContainerBootstrap: SparkPodInitContainerBootstrap,
26+
executorInitContainerConfiguration: ExecutorInitContainerConfiguration)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/KubernetesFileUtils.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ private[spark] object KubernetesFileUtils {
3333
filterUriStringsByScheme(uris, _ == "local")
3434
}
3535

36+
def getNonContainerLocalFiles(uris: Iterable[String]): Iterable[String] = {
37+
filterUriStringsByScheme(uris, _ != "local")
38+
}
39+
3640
def getOnlySubmitterLocalFiles(uris: Iterable[String]): Iterable[String] = {
3741
filterUriStringsByScheme(uris, _ == "file")
3842
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,6 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
123123
private val CREDENTIALS_SET_CONF = "spark.kubernetes.driverCredentials.provided"
124124
private val CREDENTIALS_SET_ANNOTATION = "credentials-set"
125125

126-
@Mock
127-
private var initContainerConfigMapBuilder: SparkInitContainerConfigMapBuilder = _
128126
@Mock
129127
private var containerLocalizedFilesResolver: ContainerLocalizedFilesResolver = _
130128
@Mock
@@ -173,12 +171,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
173171
})
174172
when(initContainerComponentsProvider.provideContainerLocalizedFilesResolver())
175173
.thenReturn(containerLocalizedFilesResolver)
176-
when(initContainerComponentsProvider.provideExecutorInitContainerConfiguration())
177-
.thenReturn(executorInitContainerConfiguration)
178174
when(submittedDependenciesSecretBuilder.build())
179175
.thenReturn(INIT_CONTAINER_SECRET)
180-
when(initContainerConfigMapBuilder.build())
181-
.thenReturn(INIT_CONTAINER_CONFIG_MAP)
182176
when(kubernetesClient.pods()).thenReturn(podOps)
183177
when(podOps.create(any())).thenAnswer(new Answer[Pod] {
184178
override def answer(invocation: InvocationOnMock): Pod = {
@@ -214,9 +208,10 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
214208
when(initContainerComponentsProvider
215209
.provideSubmittedDependenciesSecretBuilder(Some(SUBMITTED_RESOURCES.secrets())))
216210
.thenReturn(Some(submittedDependenciesSecretBuilder))
217-
when(initContainerComponentsProvider
218-
.provideInitContainerConfigMapBuilder(Some(SUBMITTED_RESOURCES.ids())))
219-
.thenReturn(initContainerConfigMapBuilder)
211+
when(initContainerComponentsProvider.provideInitContainerBundle(Some(SUBMITTED_RESOURCES.ids()),
212+
RESOLVED_SPARK_JARS ++ RESOLVED_SPARK_FILES))
213+
.thenReturn(Some(InitContainerBundle(INIT_CONTAINER_CONFIG_MAP,
214+
initContainerBootstrap, executorInitContainerConfiguration)))
220215
runAndVerifyDriverPodHasCorrectProperties()
221216
val resourceListArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata])
222217
verify(kubernetesClient).resourceList(resourceListArgumentCaptor.capture())
@@ -232,8 +227,6 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
232227
verifyConfigMapWasCreated(createdResources)
233228
verify(submittedDependencyUploader).uploadJars()
234229
verify(submittedDependencyUploader).uploadFiles()
235-
verify(initContainerComponentsProvider)
236-
.provideInitContainerConfigMapBuilder(Some(SUBMITTED_RESOURCES.ids()))
237230
verify(initContainerComponentsProvider)
238231
.provideSubmittedDependenciesSecretBuilder(Some(SUBMITTED_RESOURCES.secrets()))
239232
}
@@ -250,8 +243,6 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
250243
verifyConfigMapWasCreated(createdResources)
251244
verify(submittedDependencyUploader, times(0)).uploadJars()
252245
verify(submittedDependencyUploader, times(0)).uploadFiles()
253-
verify(initContainerComponentsProvider)
254-
.provideInitContainerConfigMapBuilder(None)
255246
verify(initContainerComponentsProvider)
256247
.provideSubmittedDependenciesSecretBuilder(None)
257248
}
@@ -321,9 +312,10 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
321312
when(initContainerComponentsProvider
322313
.provideSubmittedDependenciesSecretBuilder(None))
323314
.thenReturn(None)
324-
when(initContainerComponentsProvider
325-
.provideInitContainerConfigMapBuilder(None))
326-
.thenReturn(initContainerConfigMapBuilder)
315+
when(initContainerComponentsProvider.provideInitContainerBundle(None, RESOLVED_SPARK_JARS ++
316+
RESOLVED_SPARK_FILES))
317+
.thenReturn(Some(InitContainerBundle(INIT_CONTAINER_CONFIG_MAP,
318+
initContainerBootstrap, executorInitContainerConfiguration)))
327319
}
328320

329321
private def expectationsForNoMountedCredentials(): Unit = {

0 commit comments

Comments
 (0)